Skip to main content

Concurrent

In this section, we'll improve the connector performance by reading multiple stream slices in parallel.

Let's update the source. The bulk of the change is changing its parent class to ConcurrentSourceAdapter, and updating its __init__ method so it's properly initialized. This requires a little bit of boilerplate:

# import the following libraries
import logging
import pendulum
from airbyte_cdk.logger import AirbyteLogFormatter
from airbyte_cdk.models import Level
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter, ConcurrentSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
class SourceSurveyMonkeyDemo(ConcurrentSourceAdapter):
message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[_logger.level]))

def __init__(self, config: Optional[Mapping[str, Any]], state: Optional[Mapping[str, Any]]):
if config:
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
else:
concurrency_level = _DEFAULT_CONCURRENCY
_logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}")
concurrent_source = ConcurrentSource.create(
concurrency_level, concurrency_level // 2, _logger, self._slice_logger, self.message_repository
)
super().__init__(concurrent_source)
self._config = config
self._state = state

def _get_slice_boundary_fields(self, stream: Stream, state_manager: ConnectorStateManager) -> Optional[Tuple[str, str]]:
return ("start_date", "end_date")

We'll also need to update the streams method to wrap the streams in an adapter class to enable concurrency.

# import the following libraries
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField, ConcurrentCursor, FinalStateCursor
    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(config["access_token"])

survey_stream = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", authenticator=auth, cursor_field="date_modified")
synchronous_streams = [
survey_stream,
SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=survey_stream)
]
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in synchronous_streams}, state=self._state)

configured_streams = []

for stream in synchronous_streams:

if stream.cursor_field:
cursor_field = CursorField(stream.cursor_field)
legacy_state = state_manager.get_stream_state(stream.name, stream.namespace)
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
legacy_state,
self.message_repository,
state_manager,
stream.state_converter,
cursor_field,
self._get_slice_boundary_fields(stream, state_manager),
pendulum.from_timestamp(_START_DATE),
EpochValueConcurrentStreamStateConverter.get_end_provider()
)
else:
cursor = FinalStateCursor(stream.name, stream.namespace, self.message_repository)
configured_streams.append (
StreamFacade.create_from_stream(stream,
self,
_logger,
legacy_state,
cursor)
)
return configured_streams

The most interesting piece from this block is the use of ConcurrentCursor to support concurrent state management.

The survey responses stream does not support incremental reads, so it's using a FinalStateCursor instead. The rest of the code change is mostly boilerplate.

We'll also add a state converter to the SurveyMonkeyBaseStream to describe how the state cursor is formatted. We'll use the EpochValueConcurrentStreamStateConverter since the get_updated_state method returns the cursor as a timestamp

# import the following library
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
state_converter = EpochValueConcurrentStreamStateConverter()

Next we'll add a few missing constants:

_DEFAULT_CONCURRENCY = 10
_MAX_CONCURRENCY = 10
_RATE_LIMIT_PER_MINUTE = 120
_logger = logging.getLogger("airbyte")

info

The substream isn't entirely concurrent because its stream_slices definition reads records from the parent stream concurrently:

    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for _slice in self._parent_stream.stream_slices():
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
yield parent_record

This can be solved by implementing the connector using constructs from the concurrent CDK directly instead of wrapping synchronous streams in an adapter. This is left outside of the scope of this tutorial because no production connectors currently implement this.

We'll now enable throttling to avoid going over the API rate limit. You can do this by configuring a moving window rate limit policy for the SurveyMonkeyBaseStream class:

# import the following libraries
from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy, HttpAPIBudget, Rate
class SurveyMonkeyBaseStream(HttpStream, ABC):
def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], data_field: Optional[str], cursor_field: Optional[str],
**kwargs: Any) -> None:
self._name = name
self._path = path
self._primary_key = primary_key
self._data_field = data_field
self._cursor_field = cursor_field
super().__init__(**kwargs)

policies = [
MovingWindowCallRatePolicy(
rates=[Rate(limit=_RATE_LIMIT_PER_MINUTE, interval=datetime.timedelta(minutes=1))],
matchers=[],
),
]
api_budget = HttpAPIBudget(policies=policies)
super().__init__(api_budget=api_budget, **kwargs)

Finally, update the run.py file to properly instantiate the class. Most of this code is boilerplate code and isn't specific to the Survey Monkey connector.

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import sys
import traceback
from datetime import datetime
from typing import List

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type

from .source import SourceSurveyMonkeyDemo

def _get_source(args: List[str]):
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceSurveyMonkeyDemo(
SourceSurveyMonkeyDemo.read_config(config_path) if config_path else None,
SourceSurveyMonkeyDemo.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
).json()
)
return None



def run():
args = sys.argv[1:]
source = _get_source(args)
launch(source, args)

You can now run a read operation again. The connector will read multiple partitions concurrently instead of looping through all of them sequentially.

poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json

We're now done! We implemented a Python connector covering many features:

  • Fast and reproducible integration tests
  • Authentication errors are detected and labeled as such
  • One stream supports incremental reads
  • One stream depends on another stream

The final code can be found here