Patterns¶
Common patterns used throughout the examples.
Subflows for composition¶
Break large pipelines into smaller, independently testable flows. A parent flow calls child flows like regular functions:
@flow
def extract() -> list[dict]:
return fetch_records()
@flow
def transform(raw: list[dict]) -> list[dict]:
return [clean(r) for r in raw]
@flow
def pipeline():
raw = extract()
transformed = transform(raw)
load(transformed)
Each subflow gets its own flow run in the Prefect UI, with independent state tracking and retry behaviour.
See: Subflows, Flow of Flows, Complex Pipeline
Dynamic mapping with .map()¶
Fan out work over a list of items at runtime. The number of items does not need to be known at definition time:
@task
def process_item(item: str) -> str:
return f"processed({item})"
@flow
def pipeline():
items = generate_items() # returns ["a", "b", "c", ...]
processed = process_item.map(items)
summarize(processed)
.map() creates one task run per item. Results are collected as a list of
futures.
You can chain .map() calls to build multi-step fan-out pipelines:
See: Dynamic Tasks, Complex Pipeline, WorldPop Country Comparison
Error handling with allow_failure and retries¶
Retries¶
Add retries and retry_delay_seconds to any task:
Prefect automatically retries the task up to the specified number of times, with a delay between attempts.
allow_failure¶
When a task may fail but downstream work should still run, wrap its future with
allow_failure:
from prefect import allow_failure
@flow
def pipeline():
risky = risky_task.submit()
cleanup_task(wait_for=[allow_failure(risky)])
See: State Handlers, Retries and Hooks, WorldPop Dataset Catalog
Polling with while loops¶
Replace Airflow sensors with a simple polling loop inside a task:
@task
def wait_for_file(path: str, interval: float = 5.0, timeout: float = 300.0) -> str:
start = time.monotonic()
while True:
if Path(path).exists():
return path
if time.monotonic() - start > timeout:
raise TimeoutError(f"Timed out waiting for {path}")
time.sleep(interval)
Use .submit() to run multiple polls in parallel:
future_a = poll_condition.submit(name="sensor-A")
future_b = poll_condition.submit(name="sensor-B")
process([future_a.result(), future_b.result()])
See: Polling Tasks, WorldPop Population Stats
Concurrency limits¶
Throttle how many tasks run simultaneously using the concurrency() context
manager:
from prefect.concurrency.sync import concurrency
@task
def rate_limited_call(item: str) -> str:
with concurrency("api-limit", occupy=1):
return call_external_api(item)
The limit name is global -- all tasks sharing the same name compete for the same slots.
See: Concurrency Limits
Variables and configuration¶
Store runtime configuration in Prefect Variables:
from prefect.variables import Variable
# Write
Variable.set("my_config", '{"batch_size": 100}', overwrite=True)
# Read
raw = Variable.get("my_config", default="{}")
config = json.loads(raw)
Combine with typed flow parameters for flexible, environment-specific behaviour:
See: Variables and Params
State hooks for observability¶
Attach hook functions to tasks or flows to react to state transitions:
def on_failure(task, task_run, state):
send_alert(f"Task {task_run.name} failed: {state}")
@task(on_failure=[on_failure])
def important_task():
...
def on_completion(flow, flow_run, state):
log_metric(f"Flow {flow_run.name} finished: {state.name}")
@flow(on_completion=[on_completion])
def pipeline():
...
Hooks are plain functions, not tasks. They receive the task/flow object, the run metadata, and the final state.
See: State Handlers, Retries and Hooks
Task caching¶
Cache task results to avoid redundant computation. Prefect offers three caching strategies:
from prefect.cache_policies import INPUTS, TASK_SOURCE
# Cache by inputs — same arguments return cached result
@task(cache_policy=INPUTS, cache_expiration=300)
def expensive_computation(x: int, y: int) -> int:
return x * y
# Cache by source + inputs — invalidate when code changes
@task(cache_policy=TASK_SOURCE + INPUTS)
def source_aware_task(data: str) -> str:
return data.upper()
# Custom cache key function
def my_cache_key(context, parameters):
return f"{parameters['category']}:{parameters['item_id']}"
@task(cache_key_fn=my_cache_key, cache_expiration=600)
def cached_lookup(category: str, item_id: int) -> dict: ...
Cache hits are only visible during Prefect runtime. Calling .fn() always
executes the underlying function.
See: Task Caching
Async patterns¶
Use async def for tasks and flows when working with I/O-bound operations.
Combine with asyncio.gather() for concurrent execution:
@task
async def fetch_endpoint(name: str) -> dict:
await asyncio.sleep(0.5)
return {"endpoint": name, "records": 100}
@flow
async def concurrent_flow() -> None:
# Concurrent — total time ≈ max(delays)
results = await asyncio.gather(
fetch_endpoint("users"),
fetch_endpoint("orders"),
fetch_endpoint("products"),
)
Sync and async tasks can be mixed in an async flow. Sync tasks are called normally; async tasks are awaited.
See: Async Tasks, Concurrent Async, Async Flow Patterns, Async Map and Submit
Artifacts¶
Publish rich content to the Prefect UI with artifact functions:
from prefect.artifacts import create_markdown_artifact, create_table_artifact, create_link_artifact
# Markdown reports
create_markdown_artifact(key="report", markdown="# Report\n...", description="Daily report")
# Structured tables
create_table_artifact(key="inventory", table=[{"item": "A", "qty": 10}])
# Reference links
create_link_artifact(key="dashboard", link="https://example.com/dashboard")
Without a Prefect server, artifact functions silently no-op — tests pass locally.
See: Markdown Artifacts, Table and Link Artifacts
Blocks and secrets¶
Blocks provide typed, reusable configuration. The built-in Secret block
handles encrypted credentials:
from prefect.blocks.system import Secret
# Load with graceful fallback
try:
api_key = Secret.load("my-api-key").get()
except ValueError:
api_key = "dev-fallback"
Define custom blocks for typed configuration with SecretStr for credentials.
The block stores connection details; get_client() returns a dedicated API
client class:
from prefect.blocks.core import Block
from pydantic import Field, SecretStr
class Dhis2Client:
"""Authenticated DHIS2 API client."""
def __init__(self, base_url: str, username: str, password: str) -> None:
self._http = httpx.Client(
base_url=f"{base_url}/api",
auth=(username, password),
timeout=60,
)
def fetch_metadata(self, endpoint: str) -> list[dict]:
resp = self._http.get(f"/{endpoint}", params={"paging": "false"})
resp.raise_for_status()
return resp.json()[endpoint]
class Dhis2Credentials(Block):
base_url: str = "https://play.im.dhis2.org/dev"
username: str = "admin"
password: SecretStr = Field(default=SecretStr("district"))
def get_client(self) -> Dhis2Client:
return Dhis2Client(
self.base_url,
self.username,
self.password.get_secret_value(),
)
The get_client() pattern is used by official Prefect integrations (prefect-aws,
prefect-gcp). It returns an authenticated client, keeping credential storage
(Block) separate from API logic (Client).
Blocks can be saved (block.save("name")) and loaded (Block.load("name"))
from the Prefect server. SecretStr fields are encrypted at rest.
Graceful fallback for development¶
Load from the server when available, fall back to inline defaults when developing without a server:
def get_dhis2_credentials() -> Dhis2Credentials:
try:
return Dhis2Credentials.load("dhis2")
except Exception:
return Dhis2Credentials() # uses inline defaults
Testing blocks (mock patterns)¶
Mock at the Dhis2Client method level with @patch.object to avoid
interfering with Prefect's internal httpx usage:
from unittest.mock import MagicMock, patch
@patch.object(Dhis2Client, "fetch_metadata")
def test_fetch(mock_fetch):
mock_fetch.return_value = [{"id": "OU1"}, {"id": "OU2"}]
client = MagicMock(spec=Dhis2Client)
client.fetch_metadata = mock_fetch
result = some_task.fn(client)
For full flow integration tests, patch Dhis2Credentials.get_client to return
a mock Dhis2Client:
@patch.object(Dhis2Credentials, "get_client")
def test_flow(mock_get_client):
mock_client = MagicMock(spec=Dhis2Client)
mock_client.fetch_metadata.return_value = [{"id": "OU1"}]
mock_get_client.return_value = mock_client
state = my_flow(return_state=True)
assert state.is_completed()
See: Secret Block, Custom Blocks
Notification blocks¶
Prefect ships with NotificationBlock subclasses that provide a unified
notify(body, subject) interface for pipeline alerting. The two most common
are SlackWebhook and CustomWebhookNotificationBlock:
import os
from pydantic import SecretStr
from prefect.blocks.notifications import SlackWebhook, CustomWebhookNotificationBlock
# SlackWebhook -- read URL from environment, never hardcode
slack = SlackWebhook(url=SecretStr(os.environ["SLACK_WEBHOOK_URL"]))
slack.notify(body="Pipeline completed: 150 records processed", subject="Pipeline Alert")
# CustomWebhookNotificationBlock -- flexible for any HTTP endpoint
custom = CustomWebhookNotificationBlock(
name="ops-webhook",
url="https://monitoring.example.com/alerts",
method="POST",
json_data={"text": "{{subject}}: {{body}}"},
secrets={"api_token": "my-secret-token"},
)
custom.notify(body="All checks passed", subject="Quality Report")
Both blocks call notify(body, subject) -- the only difference is how
they build the HTTP request.
Template resolution¶
CustomWebhookNotificationBlock resolves {{subject}}, {{body}},
{{name}}, and any key from secrets in both the URL and JSON payload:
block = CustomWebhookNotificationBlock(
name="template-demo",
url="https://api.example.com/notify?token={{api_token}}",
method="POST",
json_data={
"title": "{{subject}}",
"message": "{{body}}",
"source": "{{name}}",
"auth": "Bearer {{api_token}}",
},
secrets={"api_token": "secret-xyz-789"},
)
When notify() is called, every {{placeholder}} is replaced with its
value before the HTTP request is sent.
Flow hooks for production notifications¶
Attach notification blocks to flow hooks for automatic alerting:
def on_completion_notify(flow, flow_run, state):
SlackWebhook.load("prod-slack").notify(
body=f"Flow {flow_run.name!r} completed.",
subject="Flow Completed",
)
def on_failure_notify(flow, flow_run, state):
SlackWebhook.load("prod-slack").notify(
body=f"CRITICAL: Flow {flow_run.name!r} failed: {state.message}",
subject="Flow Failed",
)
@flow(on_completion=[on_completion_notify], on_failure=[on_failure_notify])
def my_pipeline():
...
Save a block once with slack.save("prod-slack", overwrite=True), then
load it in any hook or task with SlackWebhook.load("prod-slack").
See: Notification Blocks
Deployment basics¶
Serve vs deploy decision guide¶
Use flow.serve() when:
- You want the simplest possible deployment
- The flow runs on a single machine (dev laptop, cron server, VM)
- No Docker or Kubernetes infrastructure is available
Use flow.deploy() when:
- You need infrastructure-level isolation (Docker, K8s)
- Multiple team members trigger runs from the UI
- You want auto-scaling via work pool configuration
Use prefect.yaml when:
- You manage multiple deployments declaratively
- You want deployment config in version control
- You follow a GitOps workflow
flow.serve()¶
The simplest deployment -- runs in-process with optional scheduling:
flow.serve() blocks the process and polls for scheduled runs. It is the
Prefect equivalent of placing a DAG file in Airflow's dags/ folder.
flow.deploy()¶
Production deployment -- sends runs to a work pool:
Work pools define where work runs (process, docker, kubernetes). Workers poll pools for scheduled runs.
Parameterized deployments¶
Pass default parameters at deployment time. These can be overridden per run from the UI, API, or CLI:
# In code
my_flow.serve(
name="dhis2-sync",
cron="0 6 * * *",
parameters={"endpoints": ["organisationUnits", "dataElements"]},
)
# In prefect.yaml
# deployments:
# - name: dhis2-ou
# entrypoint: deployments/dhis2_ou/flow.py:dhis2_ou_flow
Override at run time:
prefect.yaml pattern¶
Define multiple deployments in a single YAML file at the project root:
deployments:
- name: dhis2-ou
entrypoint: flow.py:dhis2_ou_flow
schedules:
- cron: "0 6 * * *"
timezone: "UTC"
work_pool:
name: default
- name: etl-every-5m
entrypoint: flows/core/core_flow_serve.py:flow_serve_flow
schedules:
- cron: "*/5 * * * *"
work_pool:
name: default
Deploy all at once with prefect deploy --all or individually with
prefect deploy -n dhis2-ou.
Deployment-aware flows with prefect.runtime¶
Use prefect.runtime to access deployment context inside a running flow:
from prefect.runtime import deployment, flow_run
deployment_name = deployment.name # None for local runs
flow_run_name = flow_run.name # auto-generated name
scheduled_start = flow_run.scheduled_start_time
This lets a single flow produce different output depending on whether it is running locally or as a scheduled deployment.
Work pool setup walkthrough¶
# 1. Create a process-based work pool
prefect work-pool create my-pool --type process
# 2. Start a worker that polls the pool
prefect worker start --pool my-pool
# 3. Deploy a flow to the pool
prefect deploy -n dhis2-ou
# 4. Trigger a run manually
prefect deployment run dhis2_ou/dhis2-ou
Managing deployments with the CLI¶
# List and inspect
prefect deployment ls
prefect deployment inspect <flow/deployment>
# Schedule management
prefect deployment set-schedule <name> --cron "0 8 * * *"
prefect deployment set-schedule <name> --interval 3600
prefect deployment set-schedule <name> --rrule "FREQ=WEEKLY;BYDAY=MO,WE,FR"
prefect deployment clear-schedule <name>
# Pause and resume
prefect deployment pause <name>
prefect deployment resume <name>
# Trigger runs with parameter overrides
prefect deployment run <name> -p key=value
# Cleanup
prefect deployment delete <name>
Dev/prod deployment pattern¶
Use flow.serve() during development for fast iteration, then switch to
flow.deploy() (or prefect.yaml) for production:
if __name__ == "__main__":
# Dev: run directly
# dhis2_ou_flow()
# Dev: serve with schedule
# dhis2_ou_flow.serve(name="dev-sync", cron="*/5 * * * *")
# Prod: deploy to work pool (or use prefect.yaml)
# dhis2_ou_flow.deploy(
# name="dhis2-ou",
# work_pool_name="my-pool",
# cron="0 6 * * *",
# )
dhis2_ou_flow()
Deploying DHIS2 flows¶
The deployments/ directory contains production deployment examples, each
with its own flow.py, prefect.yaml, and deploy.py:
dhis2_connection/-- connection check and server status artifactdhis2_ou/-- org unit listing with markdown artifact
See: Flow Serve, Schedules, Work Pools, Deployments directory
Pydantic models for type-safe pipelines¶
Use Pydantic BaseModel as task parameters and return types for automatic
validation and serialisation:
from pydantic import BaseModel
class UserRecord(BaseModel):
name: str
email: str
age: int
class ProcessingResult(BaseModel):
records: list[dict]
errors: list[str]
summary: str
@task
def validate_users(users: list[UserRecord]) -> ProcessingResult:
valid, errors = [], []
for user in users:
if user.age < 0:
errors.append(f"Invalid age for {user.name}")
else:
valid.append(user.model_dump())
return ProcessingResult(records=valid, errors=errors, summary=f"{len(valid)} valid")
Pydantic replaces XCom serialisation pain with automatic validation, type safety,
and clean .model_dump() for dict conversion.
Using Pydantic models as flow parameters (not just task parameters) lets
Prefect auto-generate rich parameter schemas. The deployment UI renders typed
form fields instead of a freeform JSON editor, and
enforce_parameter_schema validates inputs at runtime:
@flow(name="config_driven_pipeline", log_prints=True)
def config_driven_pipeline_flow(config: PipelineConfig | None = None) -> PipelineResult:
...
See: Pydantic Models, Pydantic Validation
Shell and HTTP tasks¶
Replace Airflow's BashOperator and HttpOperator with plain Python in @task
functions:
# Shell: subprocess.run() replaces BashOperator
@task
def run_command(cmd: str) -> str:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
return result.stdout.strip()
# HTTP: httpx replaces HttpOperator
@task
def http_get(url: str) -> dict:
response = httpx.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
No special operators needed. Standard Python libraries inside tasks are the Prefect way.
See: Shell Tasks, HTTP Tasks
Error handling with quarantine pattern¶
Separate good and bad records during processing, capturing error reasons:
from pydantic import BaseModel
class QuarantineResult(BaseModel):
good_records: list[dict]
bad_records: list[dict]
errors: list[str]
@task
def process_with_quarantine(records: list[dict]) -> QuarantineResult:
good, bad, errors = [], [], []
for record in records:
try:
validate(record)
good.append(record)
except ValueError as e:
bad.append(record)
errors.append(str(e))
return QuarantineResult(good_records=good, bad_records=bad, errors=errors)
The quarantine pattern prevents a few bad records from failing the entire pipeline.
See: Error Handling ETL
Transactions for atomic operations¶
Group tasks atomically with transaction() -- if any task fails, the group
is treated as a unit:
from prefect.transactions import transaction
@flow
def atomic_pipeline():
with transaction():
step_a()
step_b()
step_c()
Transactions are a Prefect-specific feature with no direct Airflow equivalent.
See: Transactions
Task runners for concurrent execution¶
Choose the right task runner for your workload:
from prefect.task_runners import ThreadPoolTaskRunner
# I/O-bound: ThreadPoolTaskRunner for concurrent execution
@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
def io_flow():
futures = fetch_data.map(urls)
return [f.result() for f in futures]
# CPU-bound: default runner (or ConcurrentTaskRunner)
@flow
def cpu_flow():
futures = compute.map(inputs)
return [f.result() for f in futures]
ThreadPoolTaskRunner provides concurrent execution for I/O-bound tasks like
API calls and file reads.
See: Task Runners
File I/O patterns¶
Use stdlib csv and json for file-based pipelines. tempfile.mkdtemp()
provides isolated working directories, and pathlib.Path keeps paths clean:
@task
def read_csv(path: Path) -> list[dict]:
with open(path, newline="") as f:
return list(csv.DictReader(f))
@task
def write_csv(directory: str, filename: str, rows: list[dict]) -> Path:
path = Path(directory) / filename
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return path
For mixed file types, dispatch on suffix:
@task
def read_file(path: Path) -> list[dict]:
if path.suffix == ".csv":
return list(csv.DictReader(open(path, newline="")))
elif path.suffix == ".json":
return json.loads(path.read_text())
See: CSV File Processing, JSON Event Ingestion, Multi-File Batch Processing
Data quality rules engine¶
Define quality rules as configuration and dispatch them to check functions:
class QualityRule(BaseModel):
name: str
rule_type: str
column: str = ""
params: dict = {}
@task
def execute_rule(data: list[dict], rule: QualityRule) -> RuleResult:
if rule.rule_type == "not_null":
return run_not_null_check.fn(data, rule.column)
elif rule.rule_type == "range":
return run_range_check.fn(data, rule.column, ...)
Compute an overall score and classify as green (>= 0.9), amber (>= 0.7), or red (< 0.7).
See: Quality Rules Engine, Cross-Dataset Validation
Configuration-driven pipelines¶
Control pipeline behaviour entirely through configuration dicts:
class PipelineConfig(BaseModel):
name: str
stages: list[StageConfig]
fail_fast: bool = True
@flow
def pipeline(raw_config: dict):
config = parse_config(raw_config)
for stage in config.stages:
if stage.enabled:
dispatch_stage(stage, context)
Different configs produce different pipeline runs through the same flow code. Disabled stages are skipped automatically.
Producer-consumer pattern¶
Decouple data production from consumption using file-based data contracts:
@flow
def producer_consumer_flow():
producer_flow(data_dir, producer_id="alpha")
producer_flow(data_dir, producer_id="beta")
results = consumer_flow(data_dir, consumer_id="main")
Producers write JSON data + metadata files. Consumers discover packages by scanning for metadata files. Each flow is independently testable.
See: Producer-Consumer
Circuit breaker pattern¶
Prevent cascading failures by opening a circuit after consecutive failures:
@task
def call_with_circuit(circuit: CircuitState, should_succeed: bool):
if circuit.state == "open":
circuit = circuit.model_copy(update={"state": "half_open"})
# Execute call, track failures, trip if threshold reached
States: closed (normal) -> open (fail-fast) -> half_open (probe) -> closed (recovery). Deterministic boolean outcomes make testing straightforward.
See: Circuit Breaker
Incremental processing with manifests¶
Track processed files in a JSON manifest to avoid reprocessing:
@task
def identify_new_files(all_files: list[Path], manifest: ProcessingManifest) -> list[Path]:
return [f for f in all_files if f.name not in manifest.processed_files]
Run the flow twice: the second run processes zero files. This is the foundation for idempotent file pipelines.
See: Incremental Processing, Idempotent Operations
Checkpoint-based recovery¶
Save progress after each stage. On re-run, skip completed stages:
for stage in stages:
if not should_run_stage.fn(store, stage):
recovered += 1
continue
result = execute_stage.fn(stage, context)
save_checkpoint.fn(store, stage, "completed", result, path)
Fail at stage X, fix the issue, re-run, and stages before X are automatically skipped.
See: Error Recovery
Application-level API caching¶
Cache API responses in a dict with hashlib-based keys and TTL expiry:
@task
def fetch_with_cache(endpoint, params, cache, ttl_seconds=300):
key = make_cache_key.fn(endpoint, params)
entry = check_cache.fn(cache, key, ttl_seconds)
if entry is not None:
return entry.value, True # cache hit
data = simulate_api_call.fn(endpoint, params)
cache[key] = {"value": data, "cached_at": time.time()}
return data, False # cache miss
Track hit/miss rates to measure cache effectiveness.
See: Response Caching
Threshold classification and advisories (flow 081)¶
Classify values against an ordered list of thresholds and generate advisories:
AQI_THRESHOLDS: list[tuple[float, str, str]] = [
(50.0, "Good", "green"),
(100.0, "Moderate", "yellow"),
(150.0, "Unhealthy for Sensitive Groups", "orange"),
(200.0, "Unhealthy", "red"),
(300.0, "Very Unhealthy", "purple"),
(float("inf"), "Hazardous", "maroon"),
]
for threshold, cat, col in AQI_THRESHOLDS:
if mean_val <= threshold:
category = cat
color = col
break
Walk the thresholds in order and stop at the first match. Use a separate severity ordering list to rank worst outcomes.
See: Air Quality Index
Composite risk scoring (flow 082)¶
Normalize risk factors to a common scale, then compute a weighted composite:
marine_avg = sum(f.normalized_score for f in marine_factors) / len(marine_factors)
flood_avg = sum(f.normalized_score for f in flood_factors) / len(flood_factors)
weighted = marine_avg * marine_weight + flood_avg * flood_weight
Configurable weights allow tuning the relative importance of each risk source.
See: Composite Risk Assessment
Pearson correlation (flows 086, 088)¶
Manual Pearson correlation using only math and statistics:
def _pearson(x: list[float], y: list[float]) -> float:
mean_x = statistics.mean(x)
mean_y = statistics.mean(y)
num = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y, strict=True))
den_x = math.sqrt(sum((xi - mean_x) ** 2 for xi in x))
den_y = math.sqrt(sum((yi - mean_y) ** 2 for yi in y))
return num / (den_x * den_y)
This pattern appears in flows 083, 086, 087, and 088. No numpy or scipy required.
See: Multi-Indicator Correlation, Hypothesis Testing
Log-linear regression (flow 089)¶
Manual OLS regression with R-squared and residual-based ranking:
cov_xy = sum((xi - mean_x) * (yi - mean_y) for xi, yi in zip(x, y, strict=True)) / n
var_x = sum((xi - mean_x) ** 2 for xi in x) / n
slope = cov_xy / var_x
intercept = mean_y - slope * mean_x
ss_res = sum((yi - (slope * xi + intercept)) ** 2 for xi, yi in zip(x, y, strict=True))
ss_tot = sum((yi - mean_y) ** 2 for yi in y)
r_squared = 1.0 - (ss_res / ss_tot)
Log-transform skewed data before regression. Rank entities by residual: negative residual means better-than-predicted performance.
See: Regression Analysis
Dimensional modeling (Star schema) (flow 090)¶
Build fact and dimension tables with surrogate keys and composite index:
@task
def build_country_dimension(data: list[dict]) -> list[DimCountry]:
for i, d in enumerate(data, 1):
dims.append(DimCountry(key=i, name=d["name"], region=d["region"], ...))
@task
def min_max_normalize(values: list[float], higher_is_better: bool) -> list[float]:
normalized = [(v - min_v) / (max_v - min_v) for v in values]
if not higher_is_better:
normalized = [1.0 - n for n in normalized]
return normalized
Surrogate keys are assigned sequentially. Min-max normalization respects the
higher_is_better flag. Weighted normalized indicators produce a composite
ranking.
See: Star Schema
Simulated SQL ETL (flow 091)¶
Three-layer ETL: staging, production, summary:
raw = generate_raw_data()
staging = load_staging(raw) # raw + timestamp
production = validate_and_transform(staging) # parsed + is_valid flag
valid = filter_valid(production)
summary = compute_summary(valid, "category") # grouped stats
Each layer adds metadata. Invalid records carry is_valid=False through the
production layer without being dropped.
See: Staged ETL Pipeline
Regex expression parsing (flow 094)¶
Count operands and operators to score expression complexity:
OPERAND_PATTERN = re.compile(r"#\{[^}]+\}")
num_operands = len(OPERAND_PATTERN.findall(expression))
num_operators = sum(1 for c in expression if c in "+-*/")
total = num_operands + num_operators
Bin scores into trivial/simple/moderate/complex categories for reporting.
See: Expression Complexity Scoring
Data lineage tracking (flow 097)¶
Hash-based provenance through pipeline stages:
@task
def compute_data_hash(records: list[DataRecord]) -> str:
raw = str(sorted(str(r.model_dump()) for r in records))
return hashlib.sha256(raw.encode()).hexdigest()[:16]
Each transformation records input_hash and output_hash. The lineage graph shows which stages modified data (input_hash != output_hash) and which were passthrough.
Pipeline templates (flow 098)¶
Factory pattern for reusable pipeline configurations:
etl_basic = create_template("etl_basic", [
StageTemplate(name="extract", stage_type="extract", default_params={"batch_size": 100}),
StageTemplate(name="load", stage_type="load", default_params={"batch_size": 100}),
])
basic_small = instantiate_template(etl_basic, {"batch_size": 50})
basic_large = instantiate_template(etl_basic, {"batch_size": 500})
Define a template once, instantiate with different overrides. Stage execution merges overrides into default parameters.
See: Pipeline Template Factory
Custom blocks for API integration¶
Subclass Block to create typed connection objects for external APIs. This is
the Prefect equivalent of Airflow's BaseHook.get_connection(). The credentials
block stores connection details; get_client() returns a dedicated API client:
from prefect.blocks.core import Block
from pydantic import Field, SecretStr
class Dhis2Client:
"""Authenticated DHIS2 API client."""
def __init__(self, base_url: str, username: str, password: str) -> None:
self._http = httpx.Client(
base_url=f"{base_url}/api",
auth=(username, password),
timeout=60,
)
def get_server_info(self) -> dict:
resp = self._http.get("/system/info")
resp.raise_for_status()
return resp.json()
def fetch_metadata(self, endpoint: str, fields: str = ":owner") -> list[dict]:
resp = self._http.get(f"/{endpoint}", params={"paging": "false", "fields": fields})
resp.raise_for_status()
return resp.json()[endpoint]
class Dhis2Credentials(Block):
_block_type_name = "dhis2-credentials"
base_url: str = Field(default="https://play.im.dhis2.org/dev")
username: str = Field(default="admin")
password: SecretStr = Field(default=SecretStr("district"))
def get_client(self) -> Dhis2Client:
return Dhis2Client(
self.base_url,
self.username,
self.password.get_secret_value(),
)
# Load from server with fallback to inline defaults
def get_dhis2_credentials() -> Dhis2Credentials:
try:
return Dhis2Credentials.load("dhis2")
except Exception:
return Dhis2Credentials()
Usage in flows is clean -- the block provides credentials, the client provides API methods:
creds = get_dhis2_credentials()
client = creds.get_client()
info = client.get_server_info() # authenticated API call
units = client.fetch_metadata("organisationUnits") # returns list[dict]
Register a block once via the Prefect UI or Dhis2Credentials(...).save("dhis2").
All flows that need the connection call Dhis2Credentials.load("dhis2").
For generic authenticated APIs, use a pluggable auth block:
class ApiAuthConfig(Block):
auth_type: str # "api_key", "bearer", "basic"
base_url: str
def build_auth_header(config: ApiAuthConfig, credentials: str) -> AuthHeader:
if config.auth_type == "api_key":
return AuthHeader(header_name="X-API-Key", header_value=credentials)
elif config.auth_type == "bearer":
return AuthHeader(header_name="Authorization",
header_value=f"Bearer {credentials}")
...
See: DHIS2 Connection Block, Authenticated API Pipeline
Secret management strategies¶
Prefect provides multiple ways to manage secrets. Choose based on your environment:
# 1. SecretStr on a Block (recommended -- keeps credentials with config)
from pydantic import SecretStr
class MyConnection(Block):
password: SecretStr = Field(default=SecretStr("dev-password"))
# 2. Secret block (standalone credential)
from prefect.blocks.system import Secret
password = Secret.load("dhis2-password").get()
# 3. Environment variable (simple, works everywhere)
import os
password = os.environ.get("DHIS2_PASSWORD", "fallback")
# 4. JSON block (for structured config)
from prefect.blocks.system import JSON
config = JSON.load("dhis2-config").value
The preferred pattern is SecretStr directly on the connection block. This
keeps credentials co-located with the connection config they belong to:
creds = Dhis2Credentials.load("dhis2") # password included, encrypted at rest
client = creds.get_client()
client.fetch_metadata("organisationUnits") # no separate password argument
Always use graceful fallbacks so flows work in development (no server) and production (with server):
def get_dhis2_credentials() -> Dhis2Credentials:
try:
return Dhis2Credentials.load("dhis2")
except Exception:
return Dhis2Credentials() # uses inline defaults
See: Environment-Based Configuration, DHIS2 Connection Block