Skip to content

Connector Framework

Arus provides a pluggable connector framework based on abstract base classes. Connectors implement a standard interface and are auto-registered at startup.


Architecture

BaseSource (abstract)          BaseDestination (abstract)
       │                               │
       ├── MySQLSource                 ├── PostgreSQLDestination
       ├── PostgreSQLSource            ├── MySQLDestination
       ├── MongoDBSource               ├── ClickHouseDestination
       └── (MariaDB = MySQLSource)     └── (MariaDB = MySQLDestination)

Registry
  _source_registry = {"mysql": MySQLSource, "postgresql": PostgreSQLSource, ...}
  _dest_registry  = {"postgresql": PostgreSQLDestination, ...}

BaseSource Interface

All source connectors inherit from BaseSource (arus/modules/connector/base_source.py):

python
from abc import ABC, abstractmethod
from typing import Iterator, Any
from dataclasses import dataclass, field


class BaseSource(ABC):
    type: str = ""

    @abstractmethod
    def connect(self, config: dict) -> bool:
        """Establish connection to the source database."""

    @abstractmethod
    def test_connection(self) -> bool:
        """Verify the connection is working."""

    @abstractmethod
    def discover_tables(self) -> list[TableSchema]:
        """Return all discoverable tables with schema info."""

    @abstractmethod
    def get_table_columns(self, table: str) -> list[dict]:
        """Return column metadata for a specific table."""

    @abstractmethod
    def detect_sync_mode(self, table: str, columns: list[dict]) -> SyncMode:
        """Auto-detect incremental vs full refresh sync mode."""

    @abstractmethod
    def extract(self, table: str, watermark: Any = None,
                batch_size: int = 10000) -> Iterator[list[dict]]:
        """Extract rows from source table, yielding batches."""

    def extract_soft_deletes(self, table: str, watermark: Any,
                              deleted_at_column: str, watermark_column: str,
                              batch_size: int = 10000) -> list[dict]:
        """Extract soft-deleted rows since last watermark.
        Default: returns empty list (no soft-delete tracking).
        """

Supporting Types

python
@dataclass
class TableSchema:
    name: str
    schema_name: str = "public"
    columns: list[dict] = field(default_factory=list)
    row_count_estimate: int = 0

@dataclass
class SyncMode:
    mode: str = "incremental"  # "incremental" or "full_refresh"
    watermark_column: str | None = None
    deleted_at_column: str | None = None

BaseDestination Interface

All destination connectors inherit from BaseDestination (arus/modules/connector/base_destination.py):

python
from abc import ABC, abstractmethod
from typing import Any


class BaseDestination(ABC):
    type: str = ""

    @abstractmethod
    def connect(self, config: dict) -> bool:
        """Establish connection to the destination database."""

    @abstractmethod
    def ensure_schema(self, source_name: str, table: str,
                       columns: list[dict], target_schema: str = None) -> None:
        """Create schemas/tables if they don't exist."""

    @abstractmethod
    def load_raw(self, source_name: str, table: str,
                  rows: list[dict], run_id: str) -> int:
        """Load rows into raw landing zone (JSONB). Returns count."""

    @abstractmethod
    def load_normalized(self, source_name: str, table: str,
                         rows: list[dict], target_schema: str = None) -> int:
        """Load rows into normalized typed table. Returns count."""

    @abstractmethod
    def update_watermark(self, pipeline_id: str, table: str, value: Any) -> None:
        """Persist watermark value for incremental tracking."""

    def delete_rows(self, source_name: str, table: str, rows: list[dict],
                    pk_columns: list[str], target_schema: str = None) -> int:
        """Delete rows from target by primary key (soft-delete support)."""

Built-in Source Connectors

MySQLSource

PropertyValue
Typemysql
Driverpymysql
Port3306
FeaturesIncremental via updated_at/created_at, full refresh, soft-delete filter (deleted_at), SSD

Sync Mode Detection

Auto-detects watermark column by searching (in order):

  1. updated_at, modified_at, last_modified, updated
  2. created_at
  3. Falls back to full_refresh if no timestamp column found

If deleted_at column is present, extract automatically adds AND deleted_at IS NULL to exclude soft-deleted rows.

Extract SQL (Incremental)

sql
SELECT * FROM table
WHERE watermark_col > %s
  [AND deleted_at IS NULL]
ORDER BY watermark_col
LIMIT %s

PostgreSQLSource

PropertyValue
Typepostgresql
Driverpsycopg2
Port5432
FeaturesSchema discovery, incremental via timestamp, full refresh, soft-delete filter

Schema Discovery

PostgreSQL connector supports discover_schemas() which lists all non-system schemas. The schema_include config filter limits which schemas are scanned.

Extract SQL (Incremental)

sql
SELECT * FROM "schema"."table"
WHERE watermark_col > %s
  [AND "deleted_at" IS NULL]
ORDER BY watermark_col
LIMIT %s

MongoDBSource

PropertyValue
Typemongodb
Driverpymongo
Port27017
FeaturesURI-based or host-based connect, schema inference via document sampling, BSON serialization

Connection

Supports two connection methods:

  1. Full URI: mongodb://user:pass@host:27017/db?authSource=admin
  2. Individual fields: host, port, username, password, database

Schema Inference

Column types are inferred by sampling one document via find_one():

BSON TypeMapped Type
datetimetimestamp
boolboolean
intbigint
floatdouble
dict, listjson
ObjectIdvarchar(48)
Othertext

MariaDBSource

PropertyValue
Typemariadb
Driverpymysql (MySQL protocol compatible)
ImplementationAliased to MySQLSource

Built-in Destination Connectors

PostgreSQLDestination

PropertyValue
Typepostgresql
Driverpsycopg2
Raw Schemastaging (configurable via raw_schema)
Target Schemaanalytics (configurable via target_schema)
FeaturesJSONB raw storage, typed analytics tables, UPSERT via ON CONFLICT DO NOTHING

Raw Table Schema

sql
CREATE TABLE staging.<source>_<table>_raw (
    _arus_id        BIGSERIAL PRIMARY KEY,
    _arus_run_id    UUID NOT NULL,
    _arus_extracted TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    _data           JSONB NOT NULL
);

Analytics Table Schema

sql
CREATE TABLE analytics.<table> (
    <column1> <pg_type> [NOT NULL],
    <column2> <pg_type> [NOT NULL],
    ...
    _arus_run_id    UUID NOT NULL,
    _arus_synced_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

MySQLDestination

PropertyValue
Typemysql
Driverpymysql
Raw Schemastaging (as MySQL database)
Target Schemaanalytics (as MySQL database)
FeaturesJSON raw storage, typed tables, batch executemany insert

Type Mapping

Source TypeMySQL Type
int, integerINT
bigintBIGINT
varcharVARCHAR(255)
textTEXT
boolean, boolTINYINT(1)
datetime, timestampDATETIME(3)
json, jsonbJSON

ClickHouseDestination

PropertyValue
Typeclickhouse
Driverclickhouse-connect
Port8123 (HTTP, auto-converts from 9000)
EngineMergeTree with auto-TTL (7 days for raw data)
FeaturesUUID-based raw storage, ReplacingMergeTree for watermarks

Raw Table Schema

sql
CREATE TABLE staging.<source>_<table>_raw (
    _arus_id         UUID DEFAULT generateUUIDv4(),
    _arus_run_id     String,
    _data            String,
    _arus_synced_at  DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (toStartOfHour(_arus_synced_at), _arus_id)
TTL toStartOfHour(_arus_synced_at) + INTERVAL 7 DAY DELETE

Column Type Mapping

Source column types are mapped to destination types via arus/shared/types.py:

PostgreSQL Type Map

Source TypePostgreSQL Type
int, integerINTEGER
bigint, serial, bigserialBIGINT
smallintSMALLINT
decimal, numericDECIMAL
float, doubleDOUBLE PRECISION
varchar, charVARCHAR
text, longtext, mediumtextTEXT
boolean, tinyintBOOLEAN
dateDATE
datetime, timestampTIMESTAMPTZ
json, jsonbJSONB
blob, binaryBYTEA

Adding a New Connector

Source Connector

  1. Create arus/modules/connector/sources/<name>.py
  2. Implement BaseSource:
    python
    from arus.modules.connector.base_source import BaseSource, TableSchema, SyncMode
    
    class MyCustomSource(BaseSource):
        type = "my_custom"
    
        def connect(self, config): ...
        def test_connection(self): ...
        def discover_tables(self): ...
        def get_table_columns(self, table): ...
        def detect_sync_mode(self, table, columns): ...
        def extract(self, table, watermark=None, batch_size=10000): ...
  3. Register it in arus/modules/connector/registry.py:
    python
    from arus.modules.connector.sources.my_custom import MyCustomSource
    register_source("my_custom", MyCustomSource)

Destination Connector

  1. Create arus/modules/connector/destinations/<name>.py
  2. Implement BaseDestination:
    python
    from arus.modules.connector.base_destination import BaseDestination
    
    class MyCustomDestination(BaseDestination):
        type = "my_custom"
    
        def connect(self, config): ...
        def ensure_schema(self, source_name, table, columns, target_schema=None): ...
        def load_raw(self, source_name, table, rows, run_id): ...
        def load_normalized(self, source_name, table, rows, target_schema=None): ...
        def update_watermark(self, pipeline_id, table, value): ...
  3. Register it in arus/modules/connector/registry.py:
    python
    from arus.modules.connector.destinations.my_custom import MyCustomDestination
    register_destination("my_custom", MyCustomDestination)