Airflow Operations
Use af commands to query, manage, and troubleshoot Airflow workflows.
Running the CLI
Run all af commands using uvx (no installation required):
uvx --from astro-airflow-mcp af <command>
Throughout this document, af is shorthand for uvx --from astro-airflow-mcp af.
Instance Configuration
Manage multiple Airflow instances with persistent configuration:
# Add a new instance af instance add prod --url https://airflow.example.com --token "$API_TOKEN" af instance add staging --url https://staging.example.com --username admin --password admin # List and switch instances af instance list # Shows all instances in a table af instance use prod # Switch to prod instance af instance current # Show current instance af instance delete old-instance # Auto-discover instances (use --dry-run to preview first) af instance discover --dry-run # Preview all discoverable instances af instance discover # Discover from all backends (astro, local) af instance discover astro # Discover Astro deployments only af instance discover astro --all-workspaces # Include all accessible workspaces af instance discover local # Scan common local Airflow ports af instance discover local --scan # Deep scan all ports 1024-65535 # IMPORTANT: Always run with --dry-run first and ask for user consent before # running discover without it. The non-dry-run mode creates API tokens in # Astro Cloud, which is a sensitive action that requires explicit approval. # Override instance for a single command af --instance staging dags list
Config file: ~/.af/config.yaml (override with --config or AF_CONFIG env var)
Tokens in config can reference environment variables using ${VAR} syntax:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
export AIRFLOW_API_URL=http://localhost:8080 export AIRFLOW_AUTH_TOKEN=your-token-here # Or username/password: export AIRFLOW_USERNAME=admin export AIRFLOW_PASSWORD=admin
Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
Quick Reference
| Command | Description |
|---|---|
af health | System health check |
af dags list | List all DAGs |
af dags get <dag_id> | Get DAG details |
af dags explore <dag_id> | Full DAG investigation |
af dags source <dag_id> | Get DAG source code |
af dags pause <dag_id> | Pause DAG scheduling |
af dags unpause <dag_id> | Resume DAG scheduling |
af dags errors | List import errors |
af dags warnings | List DAG warnings |
af dags stats | DAG run statistics |
af runs list | List DAG runs |
af runs get <dag_id> <run_id> | Get run details |
af runs trigger <dag_id> | Trigger a DAG run |
af runs trigger-wait <dag_id> | Trigger and wait for completion |
af runs diagnose <dag_id> <run_id> | Diagnose failed run |
af tasks list <dag_id> | List tasks in DAG |
af tasks get <dag_id> <task_id> | Get task definition |
af tasks instance <dag_id> <run_id> <task_id> | Get task instance |
af tasks logs <dag_id> <run_id> <task_id> | Get task logs |
af config version | Airflow version |
af config show | Full configuration |
af config connections | List connections |
af config variables | List variables |
af config variable <key> | Get specific variable |
af config pools | List pools |
af config pool <name> | Get pool details |
af config plugins | List plugins |
af config providers | List providers |
af config assets | List assets/datasets |
af api <endpoint> | Direct REST API access |
af api ls | List available API endpoints |
af api ls --filter X | List endpoints matching pattern |
User Intent Patterns
DAG Operations
- •"What DAGs exist?" / "List all DAGs" ->
af dags list - •"Tell me about DAG X" / "What is DAG Y?" ->
af dags explore <dag_id> - •"What's the schedule for DAG X?" ->
af dags get <dag_id> - •"Show me the code for DAG X" ->
af dags source <dag_id> - •"Stop DAG X" / "Pause this workflow" ->
af dags pause <dag_id> - •"Resume DAG X" ->
af dags unpause <dag_id> - •"Are there any DAG errors?" ->
af dags errors
Run Operations
- •"What runs have executed?" ->
af runs list - •"Run DAG X" / "Trigger the pipeline" ->
af runs trigger <dag_id> - •"Run DAG X and wait" ->
af runs trigger-wait <dag_id> - •"Why did this run fail?" ->
af runs diagnose <dag_id> <run_id>
Task Operations
- •"What tasks are in DAG X?" ->
af tasks list <dag_id> - •"Get task logs" / "Why did task fail?" ->
af tasks logs <dag_id> <run_id> <task_id>
System Operations
- •"What version of Airflow?" ->
af config version - •"What connections exist?" ->
af config connections - •"Are pools full?" ->
af config pools - •"Is Airflow healthy?" ->
af health
API Exploration
- •"What API endpoints are available?" ->
af api ls - •"Find variable endpoints" ->
af api ls --filter variable - •"Access XCom values" / "Get XCom" ->
af api xcom-entries -F dag_id=X -F task_id=Y - •"Get event logs" / "Audit trail" ->
af api event-logs -F dag_id=X - •"Create connection via API" ->
af api connections -X POST --body '{...}' - •"Create variable via API" ->
af api variables -X POST -F key=name -f value=val
Common Workflows
Investigate a Failed Run
# 1. List recent runs to find failure af runs list --dag-id my_dag # 2. Diagnose the specific run af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00 # 3. Get logs for failed task (from diagnose output) af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
Morning Health Check
# 1. Overall system health af health # 2. Check for broken DAGs af dags errors # 3. Check pool utilization af config pools
Understand a DAG
# Get comprehensive overview (metadata + tasks + source) af dags explore my_dag
Check Why DAG Isn't Running
# Check if paused af dags get my_dag # Check for import errors af dags errors # Check recent runs af runs list --dag-id my_dag
Trigger and Monitor
# Option 1: Trigger and wait (blocking) af runs trigger-wait my_dag --timeout 1800 # Option 2: Trigger and check later af runs trigger my_dag af runs get my_dag <run_id>
Output Format
All commands output JSON (except instance commands which use human-readable tables):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
Use jq for filtering:
# Find failed runs af runs list | jq '.dag_runs[] | select(.state == "failed")' # Get DAG IDs only af dags list | jq '.dags[].dag_id' # Find paused DAGs af dags list | jq '[.dags[] | select(.is_paused == true)]'
Task Logs Options
# Get logs for specific retry attempt af tasks logs my_dag run_id task_id --try 2 # Get logs for mapped task index af tasks logs my_dag run_id task_id --map-index 5
Direct API Access with af api
Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
# Discover available endpoints af api ls af api ls --filter variable # Basic usage af api dags af api dags -F limit=10 -F only_active=true af api variables -X POST -F key=my_var -f value="my value" af api variables/old_var -X DELETE
Field syntax: -F key=value auto-converts types, -f key=value keeps as string.
Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.
Related Skills
- •
testing-dags- Test DAGs with debugging and fixing cycles - •
debugging-dags- Comprehensive DAG failure diagnosis and root cause analysis - •
authoring-dags- Creating and editing DAG files with best practices - •
managing-astro-local-env- Starting/stopping local Airflow environment