Prefect & Production Scheduling¶
This page describes how to run and schedule pipelines in production using Prefect 3 and the built-in ETL scheduler (APScheduler).
Two ways to schedule¶
| Method | Use case | Observability |
|---|---|---|
APScheduler (etl_scheduler.py) |
Single machine, cron/interval/daily, simple setup | Log files, optional status command |
| Prefect 3 | Production deployments, retries, UI, multiple workers | Prefect Server UI, run history, logs |
Option 1: ETL Scheduler (APScheduler)¶
The ETL scheduler runs a pipeline (typically TPL/GENKEY) on a fixed schedule: interval, cron, or daily.
Config file¶
Use a YAML file (e.g. configs/etl_scheduler_config.yaml) with two main sections:
# Pipeline section: same keys as in pipelines_config.yml for the pipeline you run
pipeline:
source_folder: "data/raw/my_run"
output_folder: "data/processed/my_run"
selected_columns: [ ... ]
batch_size: 10
max_workers: 8
# ... etc.
scheduler:
mode: "interval" # or "cron", "daily", "multiple_daily"
interval_seconds: 3600 # for mode "interval" (e.g. every hour)
# cron_expression: "0 */6 * * *" # for mode "cron"
# daily_time: "02:00" # for mode "daily"
timezone: "UTC"
logging:
level: "INFO"
file: "logs/etl_scheduler.log"
max_size_mb: 10
backup_count: 5
format: "%(asctime)s - %(levelname)s - %(message)s"
Running the scheduler¶
# Foreground (recommended for testing)
python scripts/etl_scheduler.py --config configs/etl_scheduler_config.yaml
# One-shot run (no schedule)
python scripts/etl_scheduler.py --config configs/etl_scheduler_config.yaml --run-once
# Status
python scripts/etl_scheduler.py --config configs/etl_scheduler_config.yaml --status
Production start script¶
Use scripts/start_etl_scheduler.sh to start the scheduler in the background with logging:
./scripts/start_etl_scheduler.sh
# Or with a specific config:
./scripts/start_etl_scheduler.sh -c configs/etl_scheduler_daily.yaml
Features¶
- Idempotency: The pipeline (e.g. TPL/GENKEY) skips already-processed files.
- Single job at a time: No overlapping runs.
- Graceful shutdown: Handles SIGINT/SIGTERM and stops the scheduler cleanly.
- Stats: Success/failure counts and optional alerting on consecutive failures.
Option 2: Prefect 3 (production deployment)¶
Prefect 3 provides flows, deployments, schedules, and a UI for runs and logs.
Architecture¶
- Prefect Server — API and UI (e.g. port 4200).
- PostgreSQL — Backing store for Prefect (e.g. Docker).
- Prefect Worker — Pulls and runs flow runs (e.g.
processworker). - Deployments — Flow + schedule + parameters (e.g. config path).
Project layout (where things live)¶
mlops/
├── mlops/prefect/
│ ├── flows/
│ │ ├── genkey_flow.py # Flow that runs TPL/GENKEY pipeline
│ │ └── scheduler_wrapper.py # Flow that runs etl_scheduler (run-once)
│ └── deployments/
│ ├── genkey_deployment.py
│ └── schedule_configs.py # Named schedules (every_5_minutes, daily_at_4pm, etc.)
├── prefect-worker/
│ └── Dockerfile # Worker image with ETL deps
├── production/ # Centralized Prefect 3 production deployment
│ ├── docker-compose.prefect.yml # Server + PostgreSQL + worker (builds repo from ..)
│ ├── prefect.yaml # Declarative deployment definition (flows + deployments)
│ ├── deploy_prefect.sh # One-shot full deploy (server + worker + deployments)
│ ├── build_prefect_images.sh # Build Prefect Docker images (worker + deployer)
│ └── prefect_manage.sh # Convenience commands (status, logs, deploy, run, ui)
└── docs/
├── prefect_production.md # This page (high-level + quickstart)
└── PREFECT_INTEGRATION.md # Detailed architecture, Docker & troubleshooting
Quickstart: from zero to a running deployment¶
This is the recommended, step-by-step path to get Prefect 3 running your ETL in production.
Building Prefect images (CI / build machine)¶
In local development you can let Docker Compose build images from the repo, but for a real production separation you usually want to:
- Build images (with code + deps) in CI or a build machine.
- Push them to a Docker registry.
- Use only
production/+ images on the target server (no source code).
To build images from the repo root:
This will create two images:
mlops-prefect-worker:v1— Prefect worker (prefect worker start ...).mlops-prefect-deployer:v1— Same image, used to runprefect deploy.
You can then push them to your registry:
docker tag mlops-prefect-worker:v1 <your-registry>/mlops-prefect-worker:v1
docker tag mlops-prefect-deployer:v1 <your-registry>/mlops-prefect-deployer:v1
docker push <your-registry>/mlops-prefect-worker:v1
docker push <your-registry>/mlops-prefect-deployer:v1
In production/docker-compose.prefect.yml, the services use these images via env vars:
prefect-worker:
image: ${PREFECT_WORKER_IMAGE:-mlops-prefect-worker:latest}
prefect-deployer:
image: ${PREFECT_DEPLOYER_IMAGE:-mlops-prefect-deployer:latest}
On the target server (without source code) you only need:
- The
production/folder (compose + prefect.yaml + scripts). - Access to the Docker images in your registry.
Set the image names before deploying:
export PREFECT_WORKER_IMAGE=<your-registry>/mlops-prefect-worker:v1
export PREFECT_DEPLOYER_IMAGE=<your-registry>/mlops-prefect-deployer:v1
cd production
./deploy_prefect.sh
The containers will mount /app from the image itself (no bind mount of the repo), and prefect-deployer will apply production/prefect.yaml inside the container.
Step 1 — Deploy Prefect stack (Server + PostgreSQL + Worker + Deployments)¶
From the project root:
This script will:
- Start PostgreSQL, Prefect Server and Prefect Worker using
docker-compose.prefect.yml. - Create or reuse the work pool for the worker.
- Apply all deployments defined in
prefect.yaml(for example, the TPL/GENKEY pipeline flow). - Set a default schedule (by default
every_5_minutes, configurable by argument).
You can customize the schedule used for the main ETL deployment:
./production/deploy_prefect.sh daily_at_4pm
./production/deploy_prefect.sh manual_only
./production/deploy_prefect.sh every_hour --clean # Clean volumes and redeploy from scratch
Named schedules are defined in schedule_configs.py (e.g. every_5_minutes, every_hour, daily_at_4pm, manual_only).
Step 2 — Check services and connectivity¶
Use the management script to confirm everything is healthy:
This will:
- Show the status of Docker services (
postgres,prefect-server,prefect-worker,prefect-deployer). - Check that
http://localhost:4200/api/healthresponds correctly.
If something is wrong, see docs/PREFECT_INTEGRATION.md (Troubleshooting section) for details.
Step 3 — Open Prefect UI¶
To access the Prefect web UI:
This opens http://localhost:4200 in your default browser. If it cannot open automatically, you can copy the URL manually.
Step 4 — Understand what you see in the UI¶
In the Prefect UI you will mainly use:
- Deployments:
- Shows each pipeline configured in Prefect (for example, the TPL/GENKEY deployment).
- For each deployment you can see:
- Name (flow + deployment name).
- Schedule (when it runs automatically).
- Parameters (e.g.
config_pathused for the ETL).
-
You can trigger manual runs directly from this screen.
-
Flow Runs:
- Shows all executions (past, running and scheduled).
-
For each run you can open:
- Logs (including progress of the ETL, metrics, errors).
- Parameters (e.g. which config file was used).
- Status (
Running,Completed,Failed) and timestamps.
-
Work Pools:
- Shows the work pools and workers.
- You can verify that the worker is online and pulling work from the correct pool.
This gives you full visibility of:
- Which pipelines are configured (Deployments).
- When they run and when they will run next (Schedule + Flow Runs).
- How each run behaved (Logs, metrics, status).
Step 5 — Trigger a manual run of the ETL pipeline¶
You can start a run in two equivalent ways:
- From the UI:
- Go to Deployments.
- Select the ETL deployment (e.g. TPL/GENKEY deployment).
-
Click Run and confirm parameters (e.g.
config_path). -
From the CLI using the management script:
This runs the default deployment configured in prefect_manage.sh
(by default, the TPL/GENKEY deployment named tpl-genkey-pipeline-flow/tpl-genkey-every-5-minutes).
You can follow the execution:
- In the UI → Flow Runs → select the latest run → see logs and status.
- Or via CLI logs:
Changing schedules and configs at runtime (hot changes)¶
Once the Prefect stack is running (Server + Worker + deployments), you can change behavior without stopping services, both from the UI and from code.
Change a deployment from manual to scheduled (and back)¶
From the Prefect UI (hot change, without redeploying):
- Go to Deployments and select the deployment (e.g.
tpl-genkey-pipeline-manual). - Click Edit (pencil icon).
- In the Schedule section:
- To make it manual only: remove/disable the schedule (No schedule).
- To run every X seconds/minutes/hours: add an Interval schedule.
- To run at specific hours/days: add a Cron schedule.
- Save changes. The worker will start following the new schedule immediately.
This does not require restarting Docker nor re-running deploy_prefect.sh.
From code (production/prefect.yaml, infra-as-code):
- Add a
scheduleblock to the deployment definition, for example:
deployments:
- name: tpl-genkey-pipeline-manual
entrypoint: mlops/prefect/flows/pipeline_scripts_flow.py:run_pipeline_script_flow
work_pool_name: default-agent-pool
parameters:
script_name: "run_tpl_genkey_pipeline.py"
config_path: "configs/pipelines_config.yml"
schedule:
interval: 3600 # every hour
pull_steps:
- prefect.deployments.steps.set_working_directory:
directory: /app
- Then reapply deployments:
Services keep running; only deployments are updated inside Prefect.
Use a different pipelines config file (new config_path)¶
You can keep multiple configuration files, for example:
configs/pipelines_config.yml(default / shared).configs/pipelines_config_prod.yml(production tuned).configs/pipelines_config_experiments.yml(experiments).
To point a deployment to a different config file:
- Option 1 — Hot override from the UI (per run or per deployment):
- In Deployments → select a deployment (e.g.
features-pipeline-manual) → Run:- In the Parameters panel, change
config_pathto e.g.configs/pipelines_config_prod.ymlfor that run.
- In the Parameters panel, change
-
Or Edit the deployment and change the default
config_pathat deployment level. -
Option 2 — Update
production/prefect.yaml(recommended for reproducibility):
- name: features-pipeline-manual
entrypoint: mlops/prefect/flows/pipeline_scripts_flow.py:run_pipeline_script_flow
work_pool_name: default-agent-pool
parameters:
script_name: "run_features_pipeline.py"
config_path: "configs/pipelines_config_prod.yml"
pull_steps:
- prefect.deployments.steps.set_working_directory:
directory: /app
Then reapply deployments:
Note: Editing the contents of the YAML files in
configs/does not require restarting Prefect; the next run of the script/flow will read the new content directly from disk.
Run the same pipeline with two different configs (multiple deployments)¶
You can have two deployments for the same pipeline script, each with its own config_path and schedule.
Example: run TPL/GENKEY for two scenarios (SS and SUPE) with different configs.
In production/prefect.yaml:
deployments:
- name: tpl-genkey-SS-manual
entrypoint: mlops/prefect/flows/pipeline_scripts_flow.py:run_pipeline_script_flow
work_pool_name: default-agent-pool
parameters:
script_name: "run_tpl_genkey_pipeline.py"
config_path: "configs/pipelines_config_SS.yml"
pull_steps:
- prefect.deployments.steps.set_working_directory:
directory: /app
- name: tpl-genkey-SUPE-daily-2am
entrypoint: mlops/prefect/flows/pipeline_scripts_flow.py:run_pipeline_script_flow
work_pool_name: default-agent-pool
parameters:
script_name: "run_tpl_genkey_pipeline.py"
config_path: "configs/pipelines_config_SUPE.yml"
schedule:
cron: "0 2 * * *" # every day at 02:00
pull_steps:
- prefect.deployments.steps.set_working_directory:
directory: /app
Then:
- Apply deployments:
- In the UI you will see two deployments:
run-pipeline-script-flow/tpl-genkey-SS-manual→ manual only (SS config).run-pipeline-script-flow/tpl-genkey-SUPE-daily-2am→ scheduled daily at 2 AM (SUPE config).
Each one:
- Usa el mismo script (
run_tpl_genkey_pipeline.py). - Lee un config YAML distinto (múltiples escenarios).
- Puede tener schedules diferentes (manual, interval, cron) sin tocar el código del pipeline.
Manage Prefect¶
Using prefect_manage.sh (from the production/ folder or via ./production/prefect_manage.sh):
| Command | Description |
|---|---|
./production/prefect_manage.sh status |
Service status and Prefect API connectivity |
./production/prefect_manage.sh logs [worker] |
Tail logs |
./production/prefect_manage.sh deploy <schedule> |
Deploy with schedule |
./production/prefect_manage.sh run |
Trigger a manual run of the deployment |
./production/prefect_manage.sh ui |
Open Prefect UI in browser |
./production/prefect_manage.sh stop |
Stop all services |
./production/prefect_manage.sh clean |
Stop and remove volumes |
Flow behavior¶
genkey_pipeline_flow: Loads config from the path passed as parameter (e.g.configs/etl_scheduler_config.yaml), uses thepipelinesection (ortpl_genkey_pipeline), and runs the TPL/GENKEY pipeline with PrefectProgressMonitor so progress is visible in Prefect logs.- Config path: Typically set in deployment or when triggering a run (e.g.
config_path='configs/etl_scheduler_config.yaml').
How to add a new ETL pipeline as a Prefect deployment¶
Once Prefect Server and the worker are running, adding a new pipeline (new ETL flow) follows this pattern.
1. Create a new flow in mlops/prefect/flows/¶
Create a new file such as:
# mlops/prefect/flows/my_new_pipeline_flow.py
from __future__ import annotations
from prefect import flow, task, get_run_logger
@task
def run_my_pipeline(config_path: str) -> None:
logger = get_run_logger()
logger.info(f"Running my pipeline with config: {config_path}")
# Call your existing ETL code here, using config_path
@flow(name="my-new-pipeline-flow")
def my_new_pipeline_flow(config_path: str = "configs/my_new_pipeline.yaml") -> None:
"""
Prefect flow for the new ETL pipeline.
Args:
config_path: Path to the ETL config file.
"""
run_my_pipeline(config_path)
Key requirements:
- The flow must be decorated with
@flow(e.g.my_new_pipeline_flow). - It should accept parameters (typically at least
config_path) so deployments can be reused.
2. Register the flow in prefect.yaml as a new deployment¶
In prefect.yaml, add a new deployment entry pointing to your new flow entrypoint, for example:
deployments:
- name: tpl-genkey-every-5-minutes
entrypoint: mlops/prefect/flows/genkey_flow.py:genkey_pipeline_flow
# ...
- name: my-new-pipeline-every-hour
entrypoint: mlops/prefect/flows/my_new_pipeline_flow.py:my_new_pipeline_flow
parameters:
config_path: "configs/my_new_pipeline.yaml"
schedule: "every_hour" # must exist in schedule_configs.py, or use manual_only
You can reuse existing schedules (e.g. every_hour, daily_at_4pm, manual_only) or add a new one in schedule_configs.py.
3. (Optional) Add or update a named schedule in schedule_configs.py¶
If you need a new schedule, add it in schedule_configs.py and refer to it by name (e.g. my_custom_schedule). Then you can deploy using:
4. Apply deployments so Prefect knows about the new pipeline¶
With Prefect already running, re-deploy deployments so that the new flow appears:
- Option 1 (full deploy, recommended when adding new pipelines):
This:
- Rebuilds and restarts services if needed.
-
Reapplies all deployments defined in
prefect.yaml. -
Option 2 (lighter, keep services and only refresh deployments/schedule):
Now the new pipeline appears as a new deployment in the Prefect UI under Deployments.
5. Verify and run the new pipeline¶
- From CLI:
- From the UI:
- Go to Deployments and verify a deployment named similar to
my-new-pipeline-flow/my-new-pipeline-every-hour. - Trigger a manual run and confirm it executes your ETL.
- Check Flow Runs to see status, logs and parameters.
Prefect config (prefect.yaml)¶
The repo includes a prefect.yaml that defines:
- Deployment: entrypoint to the flow (e.g.
mlops/prefect/flows/genkey_flow.py:genkey_pipeline_flow). - Schedule: Can be parameterized (e.g. from env
PREFECT_SCHEDULE). - Steps: e.g. set working directory so relative paths in config resolve correctly.
Build and run are often done from the worker container so paths and dependencies match production.
Comparison¶
| Feature | APScheduler | Prefect 3 |
|---|---|---|
| Setup | Single script + YAML | Server + worker + Docker (or self-hosted) |
| Schedules | Interval, cron, daily | Same, via Prefect schedules |
| Retries | Optional in pipeline | Built-in retry policies |
| UI | No | Yes (Prefect UI) |
| Logs | File + console | Prefect logs + file |
| Multi-worker | Single process | Multiple workers possible |
Use APScheduler for simple, single-machine scheduling; use Prefect when you need observability, retries, and a central UI for production.