AgentSkillsCN

kestra

生成、调试并优化 Kestra 工作流编排 YAML 文件。Kestra 是一款开源的声明式数据编排平台,采用 YAML 语法。 当您需要以下操作时,可使用此技能: - 创建新的 Kestra 流程(工作流) - 调试或修复现有的 Kestra YAML - 添加触发器(定时任务、Webhook、基于流程的触发器) - 实现错误处理、重试机制或超时设置 - 编写脚本任务(Python、Shell、Node.js 等) - 构建并行或条件性工作流 - 从 Airflow/Prefect 迁移到 Kestra - 任何关于 Kestra 语法、插件或最佳实践的问题

SKILL.md
--- frontmatter
name: kestra
description: |
  Generate, debug, and optimize Kestra workflow orchestration YAML files. Kestra is an open-source, declarative data orchestration platform using YAML syntax.
  
  Use this skill when:
  - Creating new Kestra flows (workflows)
  - Debugging or fixing existing Kestra YAML
  - Adding triggers (schedule, webhook, flow-based)
  - Implementing error handling, retries, or timeouts
  - Writing script tasks (Python, Shell, Node.js, etc.)
  - Building parallel or conditional workflows
  - Converting from Airflow/Prefect to Kestra
  - Any question about Kestra syntax, plugins, or best practices

Kestra Workflow Generator

Kestra uses declarative YAML to define workflows. Every flow requires id, namespace, and tasks.

Core YAML Structure

yaml
id: flow-name              # Required: unique within namespace, lowercase with hyphens
namespace: company.team    # Required: dot-separated hierarchy
description: |             # Optional: supports Markdown
  Flow description
labels:                    # Optional: key-value pairs for organization
  env: prod
  team: data-engineering
inputs:                    # Optional: typed parameters
  - id: my_input
    type: STRING
    required: false
    defaults: "default value"
variables:                 # Optional: reusable values
  base_url: "https://api.example.com"
tasks:                     # Required: list of tasks to execute
  - id: task-name
    type: io.kestra.plugin...
triggers:                  # Optional: automatic execution
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 * * *"
errors:                    # Optional: error handling tasks
  - id: notify
    type: io.kestra.plugin.core.log.Log
    message: "Flow failed!"

Task Generation Workflow

  1. Identify the task type needed (script, HTTP, flowable, etc.)
  2. Use fully qualified plugin type: io.kestra.plugin.[category].[TaskName]
  3. Include all required properties for the task type
  4. Use Pebble templating for dynamic values: {{ inputs.name }}, {{ outputs.taskId.property }}

Input Types

TypeDescriptionExample
STRINGText valuedefaults: "hello"
INTIntegerdefaults: 42
FLOATDecimaldefaults: 3.14
BOOLEANTrue/falsedefaults: true
DATETIMEISO 8601 datetimedefaults: "2024-01-01T00:00:00Z"
DATEISO 8601 datedefaults: "2024-01-01"
TIMEISO 8601 timedefaults: "09:00:00"
DURATIONISO 8601 durationdefaults: "PT1H"
FILEUploaded file(stored in internal storage)
JSONJSON objectdefaults: '{"key": "value"}'
ARRAYList of itemsitemType: STRING required
SELECTDropdownvalues: [a, b, c]

Common Task Types

Logging & Debug

yaml
- id: log_message
  type: io.kestra.plugin.core.log.Log
  message: "Hello {{ inputs.name }}!"
  level: INFO  # DEBUG, INFO, WARN, ERROR

HTTP Requests

yaml
- id: api_call
  type: io.kestra.plugin.core.http.Request
  uri: "https://api.example.com/data"
  method: GET
  headers:
    Authorization: "Bearer {{ secret('API_TOKEN') }}"

Python Scripts

yaml
- id: python_task
  type: io.kestra.plugin.scripts.python.Script
  containerImage: python:3.11-slim
  beforeCommands:
    - pip install pandas requests
  script: |
    import pandas as pd
    from kestra import Kestra
    
    data = {"result": "success"}
    Kestra.outputs(data)  # Pass outputs to next tasks
  outputFiles:
    - "*.csv"  # Files to capture

Shell Commands

yaml
- id: shell_task
  type: io.kestra.plugin.scripts.shell.Commands
  commands:
    - echo "Processing {{ inputs.filename }}"
    - cat {{ outputs.download.uri }}

Detailed References

Core Workflow Components

Plugins

Architecture & Patterns

Additional Resources

  • Ask Kestra AI API: See references/ask-kestra-ai.md - Query Kestra's documentation AI when you need more specific answers or the latest documentation

Tip: If a Kestra MCP server is configured, use it to validate flows, list executions, or trigger runs directly. This skill focuses on generating correct YAML; the MCP server handles runtime interaction.

Tip: If you can't find an answer in the static references, use the Ask Kestra AI API to query their documentation assistant. It's especially useful for:

  • Edge cases not covered in common patterns
  • Latest plugin features or changes
  • Complex output structures (e.g., ForEachItem merging)
  • Troubleshooting specific error messages

Quick Debugging Checklist

  • Indentation: Exactly 2 spaces (no tabs)
  • Task IDs: Unique within flow, use lowercase with hyphens
  • Plugin types: Fully qualified (io.kestra.plugin.core.log.Log)
  • Expressions: Use {{ }} for Pebble templating
  • Secrets: Use {{ secret('SECRET_NAME') }}
  • Outputs: Reference as {{ outputs.taskId.property }}
  • Inputs: Reference as {{ inputs.inputId }}
  • Strings with colons: Quote them (message: "Time: 10:00")
  • Multi-line scripts: Use | for literal block scalar

Common Errors & Fixes

ErrorCauseFix
"Task type not found"Plugin not available or typoVerify exact plugin type string
"Invalid YAML syntax"Bad indentation or special charsUse 2-space indent, quote strings with :
"Variable not found"Wrong expression syntaxCheck {{ outputs.taskId.prop }} format
"Required property missing"Missing required task fieldCheck plugin docs for required fields
"Cannot coerce"Type mismatch in expressionVerify input/output types match
Broken JSON body / 400 errorsUser data contains " quotesUse {{ value | json }} filter (no surrounding quotes)
Fix deployed but restart still failsrestart uses original revisionUse replay with latest_revision: true instead