AgentSkillsCN

airflow

借助Apache Airflow编排数据管道——触发DAG、监控运行状态、管理连接,以及查看任务日志。使用Airflow CLI。

SKILL.md
--- frontmatter
name: airflow
description: >
  Orchestrate data pipelines with Apache Airflow — trigger DAGs, monitor runs,
  manage connections, and inspect task logs. Uses the airflow CLI.
metadata:
  openclaw:
    requires:
      bins: [clawdata]
    primaryEnv: AIRFLOW_DAGS_FOLDER
    tags: [orchestration, airflow, pipeline, scheduling, workflow]

Airflow

You can manage and orchestrate data pipelines using the airflow CLI. Use this when the user asks about scheduling, DAG runs, pipeline monitoring, or workflow management.

For data queries, use the duckdb or snowflake skill. For transformations, use the dbt skill.

Commands

DAGs

TaskCommand
List all DAGsairflow dags list
Show DAG detailsairflow dags show <dag_id>
Trigger a DAGairflow dags trigger <dag_id>
Trigger with configairflow dags trigger <dag_id> --conf '{"key":"value"}'
Pause a DAGairflow dags pause <dag_id>
Unpause a DAGairflow dags unpause <dag_id>
List DAG runsairflow dags list-runs -d <dag_id>
Backfillairflow dags backfill <dag_id> -s <start> -e <end>

Tasks

TaskCommand
List tasks in DAGairflow tasks list <dag_id>
Task stateairflow tasks state <dag_id> <task_id> <execution_date>
Task logsairflow tasks log <dag_id> <task_id> <execution_date>
Test a taskairflow tasks test <dag_id> <task_id> <execution_date>
Clear taskairflow tasks clear <dag_id> -t <task_id> -s <start> -e <end>

Connections & Variables

TaskCommand
List connectionsairflow connections list
Add connectionairflow connections add <conn_id> --conn-uri <uri>
Delete connectionairflow connections delete <conn_id>
List variablesairflow variables list
Get variableairflow variables get <key>
Set variableairflow variables set <key> <value>

System

TaskCommand
Check healthairflow db check
Show configairflow config list
Show versionairflow version
List providersairflow providers list
Show poolsairflow pools list

Add -o json to most commands for JSON output.

When to use

  • User asks about pipelines or DAGs → airflow dags list
  • User wants to run a pipeline → airflow dags trigger <dag_id>
  • User asks about a failed run → airflow dags list-runs, then airflow tasks log
  • User wants to schedule something → create a DAG file, then airflow dags unpause
  • User asks about connections → airflow connections list

Typical workflow

  1. See available pipelines:

    bash
    airflow dags list -o json
    
  2. Trigger a run:

    bash
    airflow dags trigger my_etl_pipeline
    
  3. Monitor progress:

    bash
    airflow dags list-runs -d my_etl_pipeline -o json
    
  4. Check task logs on failure:

    bash
    airflow tasks list my_etl_pipeline
    airflow tasks log my_etl_pipeline load_data 2026-02-19
    
  5. Clear and retry failed tasks:

    bash
    airflow tasks clear my_etl_pipeline -t load_data -s 2026-02-19 -e 2026-02-19
    

Creating a DAG

DAG files live in apps/airflow/dags/ (configured via AIRFLOW_DAGS_FOLDER).

The project ships with two sample DAGs:

DAGScheduleDescription
clawdata_etl@dailyFull pipeline: ingest → dbt run → dbt test
clawdata_dbt_only@hourlyLightweight: dbt run → dbt test

Example (apps/airflow/dags/daily_etl.py):

python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG(
    "daily_etl",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
) as dag:

    ingest = BashOperator(
        task_id="ingest_data",
        bash_command="clawdata data ingest-all",
    )

    transform = BashOperator(
        task_id="run_dbt",
        bash_command="clawdata dbt run",
    )

    test = BashOperator(
        task_id="test_dbt",
        bash_command="clawdata dbt test",
    )

    ingest >> transform >> test

After creating, unpause with airflow dags unpause daily_etl.

Error recovery

ProblemFix
airflow: command not foundpip install apache-airflow
DB not initialisedairflow db init (first time) or airflow db migrate
DAG not appearingCheck $AIRFLOW_HOME/dags/ and airflow dags list
Task failedairflow tasks log <dag> <task> <date> to see the error
Scheduler not runningairflow scheduler -D (daemon) or run in foreground
Webserver not runningairflow webserver -p 8080 -D

Environment

Env varDefaultPurpose
AIRFLOW_HOME~/airflowAirflow config and DAGs directory
AIRFLOW_DAGS_FOLDERapps/airflow/dagsOpenClaw DAGs directory
AIRFLOW__CORE__DAGS_FOLDER$AIRFLOW_HOME/dagsOverride DAG location
AIRFLOW__DATABASE__SQL_ALCHEMY_CONNSQLiteMetadata database connection

Notes

  • Airflow's scheduler must be running for scheduled DAGs to execute
  • Use airflow tasks test to run a task without recording it in the DB (useful for debugging)
  • The example DAG above integrates with clawdata — tying Airflow orchestration to dbt transforms
  • Add -o json for machine-readable output on most commands