Skip to content

Configuration Reference

Arus uses a combination of environment variables (for infrastructure-level config) and runtime settings (for user-level config managed through the UI).


Environment Variables

Set these in .env or pass them to the arus-api container.

Database

VariableDefaultDescription
ARUS_DB_HOSTlocalhostPostgreSQL host for warehouse + config
ARUS_DB_PORT5432PostgreSQL port
ARUS_DB_USERarusPostgreSQL user
ARUS_DB_PASSWORDarus_secretPostgreSQL password
ARUS_DB_NAMEarus_warehousePostgreSQL database name

Authentication & Security

VariableDefaultDescription
ARUS_JWT_SECRETchange-me-in-productionJWT signing secret. Must be changed in production.
ARUS_ENCRYPTION_KEY(derived from JWT secret)Fernet encryption key for stored credentials. Set a unique value in production.

Logging

VariableDefaultDescription
ARUS_LOG_LEVELINFOLog level: DEBUG, INFO, WARNING, ERROR
TZUTCTimezone for logs and scheduling

Pipeline Defaults

VariableDefaultDescription
ARUS_DEFAULT_SCHEDULE*/5 * * * *Default cron expression for new pipelines
ARUS_BATCH_SIZE10000Rows per batch when extracting from sources
ARUS_RETRY_MAX3Maximum retry attempts for failed extractions/loads
ARUS_AUTO_ALTER_SCHEMAfalseAuto-add new columns to warehouse tables on schema drift
ARUS_QUALITY_CHECK_THRESHOLD5.0Max allowed row count discrepancy percentage

Alerts (Telegram)

VariableDefaultDescription
ARUS_TELEGRAM_BOT_TOKEN(empty)Telegram bot token for pipeline failure alerts
ARUS_TELEGRAM_CHAT_ID(empty)Telegram chat ID to receive alerts

Docker Compose Defaults

VariableDefaultDescription
ARUS_DB_USER (docker)arusPostgreSQL user (separate env for Docker)
ARUS_DB_PASSWORD (docker)arus_secretPostgreSQL password (separate env for Docker)
ARUS_DB_NAME (docker)arus_warehousePostgreSQL database (separate env for Docker)

Runtime Settings (UI-Managed)

These settings are stored in the arus_config.runtime_settings table and can be modified through the Console Settings page (admin only). They override environment variable defaults.

General

KeyDefaultDescription
pipeline_name_prefixarus-prod-Prefix for auto-generated pipeline names
default_schedule*/5 * * * *Default cron expression for new pipelines
auto_discover_tablestrueEnable auto-discovery of tables when adding sources
schema_drift_detectiontrueEnable schema drift detection during pipeline runs
auto_alter_schemafalseAuto-add new columns when schema drift is detected

Quality & Retry

KeyDefaultDescription
max_retries3Maximum retry attempts for failed operations
initial_backoff2Initial backoff in seconds (doubles each retry)
quality_check_threshold5.0Max allowed row count discrepancy as percentage

Notifications

KeyDefaultDescription
notify_pipeline_failurestrueSend alerts on pipeline failures
notify_schema_drifttrueSend alerts on schema drift detection
notify_dead_lettertrueSend alerts when rows are moved to dead letter queue

Connector Configuration

Source Connector Config

FieldTypeDescription
namestringDisplay name for the source
typestringmysql, mariadb, postgresql, mongodb
hoststringSource database host
portintegerSource database port
databasestringSource database name
usernamestringSource database username
passwordstringSource database password (encrypted at rest)
sslbooleanEnable SSL connection
uristring (MongoDB only)MongoDB connection string
auth_sourcestring (MongoDB only)MongoDB authentication database
sync_methodstringauto, incremental, full_refresh
table_includestring[]Glob patterns for including tables (e.g., +orders*)
table_excludestring[]Glob patterns for excluding tables (e.g., -audit_*)
schema_includestring[]PostgreSQL schemas to scan

Destination Connector Config

FieldTypeDescription
namestringDisplay name for the destination
typestringpostgresql, mysql, clickhouse
hoststringDestination host
portintegerDestination port
databasestringDestination database name
usernamestringDestination username
passwordstringDestination password (encrypted at rest)
raw_schemastringSchema for raw landing zone (default: staging)
target_schemastringSchema for normalized tables (default: analytics)
is_defaultbooleanSet as default destination for auto-created pipelines

Pipeline Config

ParameterDefaultDescription
namePipeline display name
source_idReference to configured source
destination_idReference to configured destination
schedule*/5 * * * *Cron expression
target_schemapublicTarget analytics schema
load_modedirectdirect (source → analytics) or raw (source → staging → analytics)
depends_onnullPipeline dependency (pipeline ID)
max_retries3Per-pipeline retry override
timeout_seconds300Pipeline timeout in seconds

Per-Table Config

ParameterDefaultDescription
source_tableTable name in source database
sync_modeincrementalincremental or full_refresh
load_modedirectdirect or raw
watermark_columnauto-detectedColumn used for incremental tracking
target_schemapipeline defaultOverride target schema per table
transform_confignullArray of transform step objects
enabledtrueEnable/disable table sync

Transform Step Config

Each transform step has a type and config object:

Rename Fields

json
{
  "type": "rename",
  "config": {
    "mapping": {
      "first_name": "fname",
      "last_name": "lname"
    }
  }
}

Remove Fields

json
{
  "type": "remove_fields",
  "config": {
    "fields": ["internal_note", "temp_field"]
  }
}

Compute Field

json
{
  "type": "compute",
  "config": {
    "expression": "full_name = first_name + ' ' + last_name"
  }
}

Filter Rows

json
{
  "type": "filter",
  "config": {
    "condition": "status != 'deleted'"
  }
}

Map Values

json
{
  "type": "map_values",
  "config": {
    "column": "status",
    "mapping": {
      "1": "active",
      "0": "inactive"
    }
  }
}

Type Cast

json
{
  "type": "type_cast",
  "config": {
    "columns": {
      "price": "float",
      "is_active": "bool",
      "count": "int"
    }
  }
}

Concat Fields

json
{
  "type": "concat_fields",
  "config": {
    "fields": ["first_name", "last_name"],
    "separator": " ",
    "target": "full_name",
    "drop_source": false
  }
}

Python Script

json
{
  "type": "script",
  "config": {
    "script_name": "clean_orders"
  }
}

The script must define a function transform(row: dict) -> dict | None:

python
def transform(row):
    row["email_domain"] = row["email"].split("@")[1] if row.get("email") else None
    return row