Appearance
Tutorial: MySQL → PostgreSQL Pipeline
This tutorial walks through setting up a complete data pipeline from MySQL to PostgreSQL using Arus. You'll sync real e-commerce tables from a production MySQL database into your PostgreSQL warehouse.
Scenario
You run an e-commerce platform with a MySQL database containing:
orders— order records (updates frequently)order_items— line items per orderproducts— product catalog (rarely changes)customers— customer information
Goal: Sync these tables into a PostgreSQL warehouse for reporting, with orders synced incrementally every 5 minutes and products synced daily via full refresh.
Prerequisites
- Arus installed and running (
docker compose up -d) - Access to a MySQL source database with a read-only user
- Default PostgreSQL warehouse (created during Arus setup)
Step 1: Verify Arus is Running
bash
curl http://localhost:8081/api/healthExpected:
json
{"status":"ok","data":{"version":"0.1.0","database":"connected","scheduler":"running"}}Step 2: Add the Source Database
Open the Arus Console at http://localhost:8082 and log in.
Via Console
- Navigate to Sources → + Add Source
- Fill in the form:
- Name:
E-Commerce MySQL - Type:
MySQL - Host:
192.168.1.100(your MySQL host) - Port:
3306 - Database:
ecommerce - Username:
arus_reader - Password: (read-only user password)
- Sync Method:
Auto-detect
- Name:
- Click Test Connection to verify
- Click Save
Via API
bash
curl -X POST http://localhost:8081/api/sources \
-H "Authorization: Bearer ***" \
-H "Content-Type: application/json" \
-d '{
"name": "E-Commerce MySQL",
"type": "mysql",
"host": "192.168.1.100",
"port": 3306,
"database": "ecommerce",
"username": "arus_reader",
"password": "your_password",
"sync_method": "auto"
}'Response:
json
{
"status": "ok",
"data": { "id": "src-uuid-here", "name": "E-Commerce MySQL" }
}Step 3: Auto-Discover Tables
Via Console
- Click Rescan on your new source
- Arus scans all tables and auto-detects:
orders→ Incremental (hasupdated_at)order_items→ Incremental (hasupdated_at)products→ Incremental (hasupdated_at)customers→ Full Refresh (no timestamp column)
Table Selection
Decide which tables to sync and how:
| Table | Sync Mode | Load Mode | Reason |
|---|---|---|---|
orders | Incremental | Raw → Normalize | Frequent updates, need audit trail |
order_items | Incremental | Direct | Fast, no reprocessing needed |
products | Full Refresh | Direct | Small table, rarely changes, full sync daily |
customers | Full Refresh | Direct | Contains PII — exclude for now |
Exclude customers by unchecking its checkbox (or configure table exclusion patterns).
Via API — Discover Tables
bash
curl -X POST http://localhost:8081/api/sources/{source-id}/discover \
-H "Authorization: Bearer ***"Save the source ID from the response — you'll need it.
Step 4: Save Table Selection & Create Pipeline
Via Console
- Set the Load Mode for
ordersto Raw → Normalize - Set Load Mode for
order_itemsandproductsto Direct - Click Save Table Selection
- Arus auto-creates a pipeline named
E-Commerce MySQL Pipeline
Via API
bash
curl -X PUT http://localhost:8081/api/sources/{source-id}/tables \
-H "Authorization: Bearer ***" \
-H "Content-Type: application/json" \
-d '{
"tables": [
{
"name": "orders",
"sync_mode": "incremental",
"load_mode": "raw",
"enabled": true
},
{
"name": "order_items",
"sync_mode": "incremental",
"load_mode": "direct",
"enabled": true
},
{
"name": "products",
"sync_mode": "full_refresh",
"load_mode": "direct",
"enabled": true
}
]
}'Step 5: Configure Schedules
Arus created one pipeline with a default schedule (*/5 * * * *). But products only needs daily sync. Let's create a separate pipeline for it.
Via Console
- Go to Pipelines → Add Pipeline
- Select the same source (
E-Commerce MySQL) - Select the same destination (your default warehouse)
- Enable only products table
- Set schedule to Daily (or custom cron
0 6 * * *) - Set sync mode to Full Refresh
- Click Save
Now you have two pipelines:
| Pipeline | Tables | Schedule | Load Mode |
|---|---|---|---|
| E-Commerce MySQL Pipeline | orders, order_items | Every 5 min | Raw + Direct |
| Products Daily | products | Daily 6 AM | Full Refresh |
Via API
bash
curl -X POST http://localhost:8081/api/pipelines \
-H "Authorization: Bearer ***" \
-H "Content-Type: application/json" \
-d '{
"name": "Products Daily",
"source_id": "{source-id}",
"destination_id": "{dest-id}",
"schedule": "0 6 * * *",
"load_mode": "direct",
"tables": [
{
"name": "products",
"sync_mode": "full_refresh",
"load_mode": "direct"
}
]
}'Step 6: Run the Pipeline
Trigger a Manual Run
- Go to Pipelines
- Click on E-Commerce MySQL Pipeline
- Click Sync Now
Monitor Progress
- Watch the Run History table populate
- Click Logs on the latest run to see per-table details
- Each table shows: rows extracted, rows loaded, duration
Expected Run Flow
orders → Extract 1,000 rows → Load to staging.orders_raw (JSONB) → Normalize to analytics.orders
order_items → Extract 5,000 rows → Load directly to analytics.order_itemsStep 7: Verify Data in Warehouse
bash
# Connect to warehouse
docker exec -it arus-db psql -U arus -d arus_warehouse
# Check synced schemas
\dn
# arus_config
# arus_run_logs
# arus_state
# staging
# analytics
# Check orders in raw landing zone
SELECT _data->>'id', _data->>'total', _data->>'status'
FROM staging.e_commerce_mysql_orders_raw
LIMIT 5;
# Check normalized orders
SELECT id, total, status, _arus_synced_at
FROM analytics.orders
LIMIT 5;
# Check watermarks
SELECT source_table, watermark_value, last_synced_at
FROM arus_state.watermarks
WHERE pipeline_id = '{pipeline-id}';Step 8: Add a Transform
Let's mask email addresses in the customers table and compute a full_name field.
Via Console
- Go to Pipelines → Open the E-Commerce MySQL Pipeline
- Scroll to the Tables section
- Click Transform next to
orders - Add a Compute Field step:
- Expression:
total_with_tax = total * 1.11
- Expression:
- Add a Rename Fields step:
- Mapping:
status→order_status
- Mapping:
- Reorder steps if needed
- Click Save
Via API
bash
curl -X GET http://localhost:8081/api/pipelines/{pipeline-id} \
-H "Authorization: Bearer ***"
# Note the pipeline_table_id for "orders"
curl -X POST http://localhost:8081/api/pipelines/{pipeline-id}/tables/{table-id}/transform \
-H "Authorization: Bearer ***" \
-H "Content-Type: application/json" \
-d '{
"steps": [
{
"type": "compute",
"config": {
"expression": "total_with_tax = total * 1.11"
}
},
{
"type": "rename",
"config": {
"mapping": { "status": "order_status" }
}
}
]
}'Step 9: Set Up Notifications
- Go to Notifications → + Add Target
- Choose Telegram (or Discord/Slack)
- Enter your bot token and chat ID
- Click Save
- Go back to your pipeline → Click Notifications
- Link the notification target and select events: Failure, Dead Letter
Now you'll get alerts when something goes wrong.
Step 10: Backfill Historical Data
If you want to sync all historical data before the first incremental run:
- Go to Pipelines → Open the E-Commerce MySQL Pipeline
- Click the dropdown menu (⋮) → Backfill
- Enter a start date:
2024-01-01 - Click Confirm
Arus will reset watermarks and re-sync all data from the specified date. After backfill completes, incremental sync continues from the latest watermark.
Via API
bash
curl -X POST http://localhost:8081/api/pipelines/{pipeline-id}/backfill \
-H "Authorization: Bearer ***" \
-H "Content-Type: application/json" \
-d '{"from_date": "2024-01-01"}'Summary
You've successfully:
- ✅ Connected a MySQL source database
- ✅ Set up 2 pipelines with different schedules
- ✅ Configured Raw + Direct load modes
- ✅ Run a pipeline and verified data in the warehouse
- ✅ Added a transform to process data
- ✅ Configured notifications for failure alerts
- ✅ Performed a historical backfill
This same pattern applies to any source-destination combination. The key configuration decisions are:
- Sync mode: Incremental for frequently updated tables, Full Refresh for small/static tables
- Load mode: Raw → Normalize for tables needing audit trail, Direct for performance
- Schedule: Frequent for critical tables, daily/weekly for reference data
- Transforms: Applied per-table, after extraction and before loading
Next Steps
- Explore the Architecture to understand system internals
- Read the Connectors Guide to learn about available source/destination types
- Check the Config Reference for all pipeline options
- Set up Monitoring for production observability