Appearance
Development Guide
Guide for setting up a local development environment and contributing to Arus.
Prerequisites
- Python 3.11+
- PostgreSQL 15+
- Docker & Docker Compose (for running dependencies)
- Git
Local Setup
1. Clone and Install Dependencies
bash
# This repository is private — clone with appropriate access
git clone https://github.com/edsuwarna/arus.git
cd arus
# Create virtual environment
python -m venv venv
source venv/bin/activate # or: venv\Scripts\activate on Windows
# Install dependencies
pip install -r requirements.txt
# Install dev dependencies
pip install pytest pytest-asyncio httpx2. Start PostgreSQL
bash
# Using Docker for the database
docker run -d \
--name arus-db-dev \
-e POSTGRES_USER=arus \
-e POSTGRES_PASSWORD=arus_secret \
-e POSTGRES_DB=arus_warehouse \
-p 5432:5432 \
postgres:15-alpine3. Configure Environment
bash
cp .env.example .env
# Edit .env as needed (defaults work for local dev)4. Run Migrations
bash
# Run Alembic migrations
python -c "from arus.shared.db.migrate import run_migrations; run_migrations()"5. Start the API Server
bash
uvicorn arus.main:app --reload --host 0.0.0.0 --port 80816. Start the Frontend Console
bash
# Option 1: Using nginx
docker compose up arus-console -d
# Option 2: Using Python HTTP server (for development)
cd console
python -m http.server 8082The console at http://localhost:8082 will proxy API requests to http://localhost:8081 via nginx.
Project Structure
arus/
├── arus/ # Backend Python package
│ ├── main.py # FastAPI application entry point
│ ├── models.py # Central model imports for Alembic
│ ├── modules/ # Feature modules
│ │ ├── auth/ # Authentication & user management
│ │ ├── connector/ # Source/destination connector framework
│ │ ├── pipeline/ # Pipeline orchestration engine
│ │ ├── source/ # Source CRUD API
│ │ ├── destination/ # Destination CRUD API
│ │ ├── run_log/ # Run history & logging
│ │ ├── dag/ # DAG visualization data
│ │ ├── dashboard/ # Dashboard summary endpoints
│ │ ├── transform/ # Transform engine
│ │ ├── notification/ # Notification targets
│ │ ├── settings/ # Runtime settings
│ │ └── alert/ # Telegram alert manager
│ ├── shared/ # Shared utilities
│ │ ├── config.py # Environment-based configuration
│ │ ├── crypto.py # Fernet encryption
│ │ ├── types.py # Column type mapping
│ │ ├── db/ # Database session, engine, migrations
│ │ └── exceptions.py # Error hierarchy
│ └── utils/ # Legacy utilities
│ ├── schema_manager.py
│ └── state_manager.py
├── console/ # Frontend SPA
│ ├── index.html # Entry point
│ ├── css/ # Stylesheets
│ └── js/ # JavaScript modules
│ ├── app.js # SPA router, App singleton
│ ├── api.js # API client with auth
│ ├── components/ # Reusable components
│ └── pages/ # Page-specific scripts
├── docs/ # Documentation
├── prd/ # Product requirements
├── tests/ # Test suite
├── alembic/ # Database migrations
├── scripts/ # Utility scripts
├── nginx/ # nginx configuration
├── docker-compose.yml # Docker Compose setup
├── Dockerfile # API container build
├── Dockerfile.console # Console container build
├── requirements.txt # Python dependencies
└── .env.example # Environment templateModule Development Pattern
Each module follows a consistent layered architecture:
module/
├── models.py # SQLAlchemy ORM models (data layer)
├── schemas.py # Pydantic models (validation layer)
├── repository.py # Database operations (data access)
├── service.py # Business logic (service layer)
├── router.py # FastAPI routes (presentation layer)
└── __init__.pyExample: Adding a new field to Source
models.py: Add the column to the SQLAlchemy modelpythonnew_field = Column(String(100), nullable=True)schemas.py: Add the field to Pydantic create/update schemasrepository.py: Update queries if needed (usually no changes for simple fields)router.py: The field is automatically included in API responsesMigration: Create an Alembic migration:
bashpython -c "from arus.shared.db.migrate import create_migration; create_migration('add new_field to sources')"
Connector Development
Creating a New Source Connector
Create
arus/modules/connector/sources/<name>.pyImplement
BaseSourceinterface:pythonfrom arus.modules.connector.base_source import BaseSource, TableSchema, SyncMode class MySource(BaseSource): type = "my_source" def connect(self, config: dict) -> bool: ... def test_connection(self) -> bool: ... def discover_tables(self) -> list[TableSchema]: ... def get_table_columns(self, table: str) -> list[dict]: ... def detect_sync_mode(self, table: str, columns: list[dict]) -> SyncMode: ... def extract(self, table, watermark=None, batch_size=10000): ...Register in
arus/modules/connector/registry.py:pythonfrom arus.modules.connector.sources.my_source import MySource register_source("my_source", MySource)Add type to
Sourcemodel's type field validationAdd a database icon in
console/js/api.jsgetDbIcon()function
Creating a New Destination Connector
- Create
arus/modules/connector/destinations/<name>.py - Implement
BaseDestinationinterface - Register in registry
- Add type mapping in the destination module
- Add database icon
Testing
Running Tests
bash
# Run all tests
pytest tests/ -v
# Run specific test file
pytest tests/test_connector_soft_delete.py -v
# Run with coverage
pytest tests/ --cov=arus --cov-report=term-missingTest Structure
tests/
├── conftest.py # Fixtures and mock setup
├── test_connector_soft_delete.py # Connector soft-delete tests
└── test_executor.py # Pipeline executor testsTests use pytest with mocked database drivers. The conftest.py registers mock modules for pymysql, pymongo, and clickhouse_driver so tests can run without real database connections.
Writing Tests
python
import pytest
from unittest.mock import MagicMock, patch
def test_extract_with_watermark():
"""Test that extract filters by watermark correctly."""
from arus.modules.connector.sources.postgresql import PostgreSQLSource
source = PostgreSQLSource()
source.conn = MagicMock()
mock_cursor = MagicMock()
mock_cursor.__enter__.return_value = mock_cursor
mock_cursor.fetchall.return_value = [{"id": 1, "name": "test"}]
source.conn.cursor.return_value = mock_cursor
source._has_deleted_at_column = MagicMock(return_value=False)
source.get_table_columns = MagicMock(
return_value=[{"name": "id", "type": "integer"}]
)
batches = list(source.extract("my_table", watermark="2024-01-01"))
assert len(batches) == 1
assert batches[0][0]["id"] == 1Database Migrations
Arus uses Alembic for database migration management.
Migration Commands
bash
# Auto-generate a migration from model changes
python -c "from arus.shared.db.migrate import create_migration; create_migration('description of changes')"
# Run pending migrations
python -c "from arus.shared.db.migrate import run_migrations; run_migrations()"
# Stamp current DB as up-to-date (for existing databases)
python -c "from arus.shared.db.migrate import stamp_head; stamp_head()"Migration Files
Migrations are stored in alembic/versions/. They are auto-detected by Alembic via the arus/models.py imports which load all ORM models.
Frontend Development
Architecture
The frontend is a vanilla JS SPA with no build step:
- Router: Hash-based (
window.location.hash+hashchangeevent) - State:
Appsingleton object +_statekey-value store - API Layer:
APIobject with auto token refresh - Rendering: Template literals (no virtual DOM)
- Styling: Global CSS with dark theme design system
Adding a New Page
- Create
console/js/pages/<name>.js - Export a render function:javascript
async function renderMyPage(container) { container.innerHTML = `<div>...</div>`; } - Add the route in
console/js/app.js:javascriptcase 'mypage': await renderMyPage(content); break; - Add the script tag in
console/index.html - Add sidebar nav item in
console/js/components/sidebar.js
CSS Conventions
- CSS variables for theming:
--bg-primary,--text-primary,--accent, etc. - Dark theme: deep black
#0b0d11background with emerald#10b981accents - BEM-like class naming
- Mobile-responsive via
mobile.css
Code Style
Python
- Follow PEP 8
- Type hints required for all function signatures
- Docstrings for public APIs (Google style)
- Use
**kwargssparingly; prefer explicit parameters
JavaScript
- ES6+ syntax
const/letinstead ofvar- Template literals for HTML rendering
- Async/await for API calls
Commit Messages
Follow conventional commits:
feat: add MongoDB connector
fix: correct watermark update on empty batch
docs: update API reference
refactor: extract retry logic into decoratorDebugging
API Logs
bash
# View API logs
docker compose logs -f arus-api
# Set debug level
ARUS_LOG_LEVEL=DEBUG uvicorn arus.main:app --reloadDatabase
bash
# Connect to PostgreSQL
docker exec -it arus-db psql -U arus -d arus_warehouse
# View recent runs
SELECT * FROM arus_run_logs.runs ORDER BY started_at DESC LIMIT 5;
# Check watermarks
SELECT * FROM arus_state.watermarks;
# View dead letters
SELECT * FROM staging._dead_letters;Console
- Browser DevTools (F12) for network requests, console logs
App.toast()for in-app notifications- API responses logged to console when
DEBUGlevel is set