AgentSkillsCN

cloudflare-queues

Cloudflare Queues消息队列剧本:生产者、消费者、与Workers集成、消息批处理、重试机制、死信队列、延迟投递、并发控制、拉取式消费者、HTTP API。关键词:Cloudflare Queues、消息队列、生产者、消费者、Workers绑定、批量处理、重试机制、DLQ、死信队列、拉取式消费者、至少一次投递。

SKILL.md
--- frontmatter
name: cloudflare-queues
description: "Cloudflare Queues message queue playbook: producers, consumers, Workers integration, message batching, retries, dead letter queues, delivery delay, concurrency, pull consumers, HTTP API. Keywords: Cloudflare Queues, message queue, producer, consumer, Workers binding, batch, retry, DLQ, dead letter queue, pull consumer, at-least-once delivery."

Cloudflare Queues

Queues is a message queue for Workers. Supports push (Worker consumer) and pull (HTTP API) patterns. At-least-once delivery.


Quick Start

Create queue

bash
npx wrangler queues create my-queue

Producer binding

jsonc
// wrangler.jsonc
{
  "queues": {
    "producers": [
      {
        "queue": "my-queue",
        "binding": "MY_QUEUE"
      }
    ]
  }
}

Consumer binding

jsonc
// wrangler.jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "max_batch_size": 10,
        "max_batch_timeout": 5
      }
    ]
  }
}

Producer Worker

typescript
export interface Env {
  MY_QUEUE: Queue;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    await env.MY_QUEUE.send({ url: request.url, method: request.method });
    return new Response("Message sent");
  },
};

Consumer Worker

typescript
export interface Env {}

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      console.log(msg.body);
      msg.ack();
    }
  },
};

Producer API

send(body, options?)

typescript
await env.MY_QUEUE.send({ action: "process", id: 123 });

// With delay
await env.MY_QUEUE.send(message, { delaySeconds: 600 }); // 10 min delay

// With content type
await env.MY_QUEUE.send(message, { contentType: "json" });

sendBatch(messages, options?)

typescript
await env.MY_QUEUE.sendBatch([{ body: { id: 1 } }, { body: { id: 2 }, options: { delaySeconds: 300 } }, { body: { id: 3 } }]);

// Global delay for batch
await env.MY_QUEUE.sendBatch(messages, { delaySeconds: 600 });

Limits:

  • Max 100 messages per batch
  • Max 128 KB per message
  • Total batch ≤ 256 KB

Content Types

TypeDescription
jsonJSON serialized (default)
textPlain text
bytesRaw binary
v8V8 serialization (Workers only)

Note: Pull consumers cannot decode v8 content type.

See api.md for type definitions.


Consumer API

MessageBatch

typescript
interface MessageBatch<Body = unknown> {
  readonly queue: string;
  readonly messages: Message<Body>[];
  ackAll(): void;
  retryAll(options?: { delaySeconds?: number }): void;
}

Message

typescript
interface Message<Body = unknown> {
  readonly id: string;
  readonly timestamp: Date;
  readonly body: Body;
  readonly attempts: number;
  ack(): void;
  retry(options?: { delaySeconds?: number }): void;
}

Acknowledgment Patterns

typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      try {
        await processMessage(msg.body);
        msg.ack(); // Explicit success
      } catch (error) {
        msg.retry({ delaySeconds: 60 }); // Retry with delay
      }
    }
  },
};

Batch-level operations

typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    try {
      await processAll(batch.messages);
      batch.ackAll(); // All succeeded
    } catch (error) {
      batch.retryAll({ delaySeconds: 300 }); // Retry all
    }
  },
};

Precedence: Per-message calls override batch-level.


Consumer Configuration

jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "max_batch_size": 10, // 1-100, default 10
        "max_batch_timeout": 5, // 0-60 seconds, default 5
        "max_retries": 3, // default 3
        "max_concurrency": 10, // default: auto-scale
        "dead_letter_queue": "dlq", // optional DLQ
        "retry_delay": 60 // default retry delay (seconds)
      }
    ]
  }
}
SettingDefaultMaxDescription
max_batch_size10100Messages per batch
max_batch_timeout560Seconds to wait for batch
max_retries3100Retries before DLQ/delete
max_concurrencyauto250Concurrent invocations
retry_delay043200Default retry delay (12h)

See consumer.md for details.


Dead Letter Queues

Messages that fail after max_retries go to DLQ.

jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "max_retries": 5,
        "dead_letter_queue": "my-dlq"
      }
    ]
  }
}

Create DLQ:

bash
npx wrangler queues create my-dlq

DLQ retention: 4 days without consumer.

Process DLQ:

jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-dlq",
        "max_batch_size": 1
      }
    ]
  }
}

Delivery Delay

On send

typescript
await env.MY_QUEUE.send(message, { delaySeconds: 600 }); // 10 min

On retry

typescript
msg.retry({ delaySeconds: 3600 }); // 1 hour

Queue-level default

bash
npx wrangler queues create my-queue --delivery-delay-secs=300

Exponential backoff

typescript
const backoff = (attempts: number, base = 10) => base ** attempts;

msg.retry({ delaySeconds: Math.min(backoff(msg.attempts), 43200) });

Maximum delay: 12 hours (43200 seconds).


Concurrency

Consumers auto-scale based on backlog. Set max:

jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "max_concurrency": 5
      }
    ]
  }
}

max_concurrency: 1 = sequential processing.

Scaling factors:

  • Backlog size and growth
  • Success/failure ratio
  • max_concurrency limit

Note: retry() calls don't count as failures for scaling.


Pull Consumers (HTTP API)

For consuming outside Workers.

Enable pull consumer

jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "type": "http_pull",
        "visibility_timeout_ms": 5000,
        "max_retries": 5
      }
    ]
  }
}

Pull messages

bash
curl -X POST "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/queues/$QUEUE_ID/messages/pull" \
  -H "Authorization: Bearer $API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"batch_size": 10, "visibility_timeout_ms": 30000}'

Acknowledge messages

bash
curl -X POST "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/queues/$QUEUE_ID/messages/ack" \
  -H "Authorization: Bearer $API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "acks": [{"lease_id": "..."}],
    "retries": [{"lease_id": "...", "delay_seconds": 60}]
  }'

See pull-consumer.md for details.


Wrangler Commands

bash
# Queue management
wrangler queues create <name> [--delivery-delay-secs=N]
wrangler queues delete <name>
wrangler queues list
wrangler queues info <name>

# Pause/resume
wrangler queues pause-delivery <name>
wrangler queues resume-delivery <name>

# Purge all messages
wrangler queues purge <name>

# Consumer management
wrangler queues consumer add <queue> <script> [options]
wrangler queues consumer remove <queue> <script>
wrangler queues consumer http add <queue> [options]
wrangler queues consumer http remove <queue>

Limits

ParameterLimit
Queues per account10,000
Message size128 KB
Messages per sendBatch100
Batch size (consumer)100
Per-queue throughput5,000 msg/sec
Per-queue backlog25 GB
Message retention4 days (max 14)
Concurrent consumers250
Consumer duration15 min wall clock
Consumer CPU30 sec (max 5 min)
Delay (send/retry)12 hours
Max retries100

Increase CPU limit

jsonc
{
  "limits": {
    "cpu_ms": 300000 // 5 minutes
  }
}

Pricing

Workers Paid: 1M operations/month included, then $0.40/million.

Operation = 64 KB chunk written, read, or deleted.

ActionOperations
Send 1 message1 write
Consume 1 message1 read
Delete 1 message1 delete (on ack)
Retry1 additional read
DLQ write1 write

Formula: (Messages × 3 - 1M) / 1M × $0.40

No egress fees.

See pricing.md for examples.


Delivery Guarantees

At-least-once delivery: Messages delivered at least once, possibly duplicated.

Handle duplicates:

typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      const key = `processed:${msg.id}`;
      if (await env.KV.get(key)) {
        msg.ack(); // Already processed
        continue;
      }
      await processMessage(msg.body);
      await env.KV.put(key, "1", { expirationTtl: 86400 });
      msg.ack();
    }
  },
};

Event Notifications

R2 and other services can send events to Queues.

bash
# R2 → Queue
wrangler r2 bucket notification create my-bucket \
  --event-type object-create \
  --queue my-queue

See cloudflare-r2 skill for event notification setup.


Prohibitions

  • ❌ Do not use v8 content type with pull consumers
  • ❌ Do not exceed 128 KB per message
  • ❌ Do not rely on exactly-once delivery (use idempotency)
  • ❌ Do not ignore DLQ — process failed messages
  • ❌ Do not set excessive concurrency without testing

References

Related Skills

  • cloudflare-workers — Worker development
  • cloudflare-r2 — R2 event notifications
  • cloudflare-durable-objects — Queue producer from DO
  • cloudflare-kv — Idempotency tracking