Building Streaky: Distributed Queue System with Service Bindings (Part 3)

Part 3: Scaling to 1000+ Users

In Part 1, I shared the journey from sequential processing to distributed architecture. In Part 2, I explained how I solved IP blocking with a Rust proxy.

Now, let’s dive deep into the distributed queue system…


This content originally appeared on DEV Community and was authored by Allen Elzayn

Part 3: Scaling to 1000+ Users

In Part 1, I shared the journey from sequential processing to distributed architecture. In Part 2, I explained how I solved IP blocking with a Rust proxy.

Now, let's dive deep into the distributed queue system that makes it all work.

The Scalability Problem

After solving the IP blocking issue, I still had the CPU time limit problem. Processing 10 users sequentially took 30+ seconds. Cloudflare Workers have a 30-second CPU time limit.

The constraint:

  • Each user takes ~3 seconds to process (GitHub API + notifications)
  • 10 users × 3 seconds = 30 seconds
  • Add any overhead = Time Limit Exceeded (TLE)

The realization:
I can't process users sequentially in a single Worker. I need to distribute the work across multiple Workers.

The Solution: Service Bindings + Queue

Core idea: Instead of one Worker processing N users, spawn N Workers each processing 1 user.

Architecture:

Scheduler Worker
    |
    |-- Dispatch Worker 1 (User A)
    |-- Dispatch Worker 2 (User B)
    |-- Dispatch Worker 3 (User C)
    |-- ...
    |-- Dispatch Worker N (User N)

Each Worker:
- Fresh CPU budget (30 seconds)
- Isolated execution context
- Parallel processing

Result:

  • 10 users processed in ~10 seconds (parallel)
  • Each Worker uses <5 seconds CPU time
  • No TLE errors
  • Scales to 1000+ users

Key Components

1. Queue Table (D1 SQLite)

The queue table tracks which users need processing and their status.

CREATE TABLE cron_queue (
  id TEXT PRIMARY KEY,
  user_id TEXT NOT NULL,
  batch_id TEXT NOT NULL,
  status TEXT NOT NULL CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  started_at TEXT,
  completed_at TEXT,
  error_message TEXT,
  retry_count INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX idx_cron_queue_status ON cron_queue(status);
CREATE INDEX idx_cron_queue_batch ON cron_queue(batch_id);
CREATE INDEX idx_cron_queue_user ON cron_queue(user_id);

Why D1?

  • Already part of the stack (no external dependencies)
  • Fast enough for job queues (< 10ms queries)
  • Supports atomic operations (CTE + UPDATE + RETURNING)
  • Free tier: 50,000 writes/day (plenty for this use case)

2. Service Bindings

Service Bindings allow a Worker to call itself, creating new Worker instances.

Configuration (wrangler.toml):

[[services]]
binding = "SELF"
service = "streaky"

Usage:

// Each fetch creates a NEW Worker instance
env.SELF.fetch('http://internal/api/cron/process-user', {
  method: 'POST',
  headers: {
    'X-Cron-Secret': env.SERVER_SECRET,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    queueId: queueItem.id,
    userId: queueItem.user_id,
  }),
})

Why Service Bindings?

  • Each env.SELF.fetch() = new Worker instance
  • Fresh CPU budget per instance
  • Automatic load balancing by Cloudflare
  • No external queue service needed (Redis, SQS, etc.)

Implementation

Step 1: Initialize Batch

When the cron trigger fires, create a batch of queue items.

// web/backend/src/services/queue.ts

export async function initializeBatch(
  env: Env,
  userIds: string[]
): Promise<string> {
  const batchId = crypto.randomUUID();

  // Bulk insert users to queue
  for (const userId of userIds) {
    const queueId = crypto.randomUUID();
    await env.DB.prepare(
      `INSERT INTO cron_queue (id, user_id, batch_id, status)
       VALUES (?, ?, ?, 'pending')`
    )
      .bind(queueId, userId, batchId)
      .run();
  }

  return batchId;
}

Step 2: Atomic Queue Claiming

The critical part: prevent race conditions when multiple Workers try to claim the same user.

// web/backend/src/services/queue.ts

export async function claimNextPendingUserAtomic(
  env: Env
): Promise<QueueItem | null> {
  const result = await env.DB.prepare(`
    WITH next AS (
      SELECT id FROM cron_queue
      WHERE status = 'pending'
      ORDER BY created_at ASC
      LIMIT 1
    )
    UPDATE cron_queue
    SET status = 'processing', started_at = datetime('now')
    WHERE id IN (SELECT id FROM next)
    RETURNING id, user_id, batch_id
  `).all<QueueItem>();

  return result.results[0] ?? null;
}

Why atomic?

  • CTE (WITH) + UPDATE + RETURNING in single transaction
  • No gap between SELECT and UPDATE
  • Prevents duplicate processing
  • D1 SQLite guarantees atomicity

Without atomic claiming:

Worker 1: SELECT id WHERE status='pending' → Gets user A
Worker 2: SELECT id WHERE status='pending' → Gets user A (race!)
Worker 1: UPDATE status='processing' WHERE id=A
Worker 2: UPDATE status='processing' WHERE id=A
Result: Both workers process user A (duplicate!)

With atomic claiming:

Worker 1: CTE + UPDATE + RETURNING → Gets user A, marks processing
Worker 2: CTE + UPDATE + RETURNING → Gets user B, marks processing
Result: No duplicates, each worker gets unique user

Step 3: Scheduler (Main Worker)

The scheduler initializes the batch and dispatches Workers.

// web/backend/src/index.ts

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    console.log('[Scheduled] Cron trigger fired:', event.cron);

    // Query active users
    const usersResult = await env.DB.prepare(
      `SELECT id FROM users WHERE is_active = 1 AND github_pat IS NOT NULL`
    ).all();

    const userIds = usersResult.results.map((row: any) => row.id as string);

    if (userIds.length === 0) {
      console.log('[Scheduled] No active users to process');
      return;
    }

    // Initialize batch
    const batchId = await initializeBatch(env, userIds);
    console.log(`[Scheduled] Batch ${batchId} initialized with ${userIds.length} users`);

    // Dispatch Workers via Service Bindings
    for (let i = 0; i < userIds.length; i++) {
      const queueItem = await claimNextPendingUserAtomic(env);

      if (!queueItem) {
        console.log('[Scheduled] No more pending users in queue');
        break;
      }

      ctx.waitUntil(
        env.SELF.fetch('http://internal/api/cron/process-user', {
          method: 'POST',
          headers: {
            'X-Cron-Secret': env.SERVER_SECRET,
            'Content-Type': 'application/json',
          },
          body: JSON.stringify({
            queueId: queueItem.id,
            userId: queueItem.user_id,
          }),
        })
          .then((res) => {
            console.log(`[Scheduled] User ${queueItem.user_id} dispatched: ${res.status}`);
          })
          .catch((error: Error) => {
            console.error(`[Scheduled] User ${queueItem.user_id} dispatch failed:`, error);
          })
      );
    }

    console.log(`[Scheduled] All ${userIds.length} users dispatched for batch ${batchId}`);
  }
}

Key points:

  • ctx.waitUntil() ensures async operations complete
  • Each env.SELF.fetch() creates new Worker instance
  • Errors in one Worker don't affect others

Step 4: Worker Instance (Process Single User)

Each Worker instance processes one user.

// web/backend/src/routes/cron.ts

app.post('/process-user', async (c) => {
  // Auth check
  const secret = c.req.header('X-Cron-Secret');
  if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
    return c.json({ error: 'Unauthorized' }, 401);
  }

  try {
    const body = await c.req.json<{ queueId: string; userId: string }>();
    const { queueId, userId } = body;

    if (!queueId || !userId) {
      return c.json({ error: 'Missing queueId or userId' }, 400);
    }

    // Idempotency check
    const status = await getQueueItemStatus(c.env, queueId);

    if (status === 'completed') {
      return c.json({ 
        success: true, 
        queueId, 
        userId, 
        skipped: true, 
        reason: 'Already completed' 
      });
    }

    if (status === 'failed') {
      return c.json({ 
        success: false, 
        queueId, 
        userId, 
        skipped: true, 
        reason: 'Already failed' 
      });
    }

    // Process user
    try {
      await processSingleUser(c.env, userId);
      await markCompleted(c.env, queueId);

      return c.json({ success: true, queueId, userId });
    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : 'Unknown error';
      await markFailed(c.env, queueId, errorMessage);

      console.error(`[ProcessUser] Failed for user ${userId}:`, error);

      // Return 200 (not 500) so scheduler continues with other users
      return c.json({ success: false, queueId, userId, error: errorMessage });
    }
  } catch (error) {
    console.error('[ProcessUser] Error:', error);
    return c.json({ 
      error: 'Process user failed', 
      message: error instanceof Error ? error.message : 'Unknown error' 
    }, 500);
  }
});

Key points:

  • Idempotency protection (check status before processing)
  • Return 200 even on failure (don't block other Workers)
  • Mark completed/failed in queue

Step 5: Process Single User

The actual user processing logic.

// web/backend/src/cron/process-single-user.ts

export async function processSingleUser(env: Env, userId: string): Promise<void> {
  // Fetch user from D1
  const user = await env.DB.prepare(
    `SELECT id, github_username, github_pat, discord_webhook, telegram_token, telegram_chat_id
     FROM users
     WHERE id = ? AND is_active = 1`
  )
    .bind(userId)
    .first<User>();

  if (!user) {
    throw new Error(`User ${userId} not found or inactive`);
  }

  if (!user.github_pat) {
    throw new Error(`User ${user.github_username} has no GitHub PAT configured`);
  }

  // Initialize services
  const encryptionService = await createEncryptionService(env.ENCRYPTION_KEY);
  const notificationService = createNotificationService(env);

  // Decrypt GitHub PAT
  const decryptedPat = await encryptionService.decrypt(user.github_pat);

  // Create GitHub service
  const githubService = createCachedGitHubService(decryptedPat, 5);

  // Check contributions
  const contributionsToday = await githubService.getContributionsToday(user.github_username);
  const currentStreak = await githubService.getCurrentStreak(user.github_username);

  // Prepare notification message
  const notificationMessage = {
    username: user.github_username,
    currentStreak,
    contributionsToday,
    message: contributionsToday > 0
      ? `Great job! You made ${contributionsToday} contribution${contributionsToday > 1 ? 's' : ''} today! Your ${currentStreak}-day streak is safe. Keep it up!`
      : `You have not made any contributions today! Your ${currentStreak}-day streak is at risk. Make a commit to keep it alive!`,
  };

  // Send Discord notification if configured
  if (user.discord_webhook) {
    try {
      const discordResult = await notificationService.sendDiscordNotification(
        user.discord_webhook,
        notificationMessage
      );

      await logNotification(env, user.id, 'discord', discordResult.success ? 'sent' : 'failed', discordResult.error);

      if (discordResult.success) {
        console.log(`[Process] Discord notification sent to ${user.github_username}`);
      } else {
        console.error(`[Process] Discord notification failed for ${user.github_username}:`, discordResult.error);
      }
    } catch (error) {
      console.error(`[Process] Error sending Discord notification to ${user.github_username}:`, error);
    }
  }

  // Send Telegram notification if configured
  if (user.telegram_token && user.telegram_chat_id) {
    try {
      const telegramResult = await notificationService.sendTelegramNotification(
        user.telegram_token,
        user.telegram_chat_id,
        notificationMessage
      );

      await logNotification(env, user.id, 'telegram', telegramResult.success ? 'sent' : 'failed', telegramResult.error);

      if (telegramResult.success) {
        console.log(`[Process] Telegram notification sent to ${user.github_username}`);
      } else {
        console.error(`[Process] Telegram notification failed for ${user.github_username}:`, telegramResult.error);
      }
    } catch (error) {
      console.error(`[Process] Error sending Telegram notification to ${user.github_username}:`, error);
    }
  }

  console.log(`[Process] User ${user.github_username} processed successfully`);
}

Advanced Features

1. Stale Item Requeuing

What if a Worker crashes? Items stuck in "processing" need to be requeued.

// web/backend/src/services/queue.ts

export async function requeueStaleProcessing(
  env: Env,
  minutes: number = 10
): Promise<number> {
  const result = await env.DB.prepare(`
    UPDATE cron_queue
    SET status = 'pending', started_at = NULL
    WHERE status = 'processing'
      AND started_at < datetime('now', '-' || ? || ' minutes')
  `)
    .bind(minutes)
    .run();

  return result.meta.changes;
}

Usage in scheduler:

// Reaper for stale processing items (10+ minutes)
ctx.waitUntil(
  requeueStaleProcessing(env, 10)
    .then((requeued) => {
      if (requeued > 0) {
        console.log(`[Scheduled] Requeued ${requeued} stale processing items`);
      }
    })
    .catch((error: Error) => {
      console.error('[Scheduled] Error requeuing stale items:', error);
    })
);

2. Batch Cleanup

Delete old batches to prevent database bloat.

// web/backend/src/services/queue.ts

export async function cleanupOldBatches(
  env: Env,
  daysOld: number = 7
): Promise<number> {
  const result = await env.DB.prepare(`
    DELETE FROM cron_queue
    WHERE created_at < datetime('now', '-' || ? || ' days')
  `)
    .bind(daysOld)
    .run();

  return result.meta.changes;
}

Usage in scheduler:

// Cleanup old batches (7+ days)
ctx.waitUntil(
  cleanupOldBatches(env, 7)
    .then((deleted) => {
      if (deleted > 0) {
        console.log(`[Scheduled] Cleaned up ${deleted} old queue items`);
      }
    })
    .catch((error: Error) => {
      console.error('[Scheduled] Error cleaning up old batches:', error);
    })
);

3. Batch Progress Tracking

Monitor batch progress in real-time.

// web/backend/src/services/queue.ts

export interface BatchProgress {
  pending: number;
  processing: number;
  completed: number;
  failed: number;
  total: number;
}

export async function getBatchProgress(
  env: Env,
  batchId: string
): Promise<BatchProgress> {
  const results = await env.DB.prepare(`
    SELECT status, COUNT(*) as count
    FROM cron_queue
    WHERE batch_id = ?
    GROUP BY status
  `)
    .bind(batchId)
    .all();

  const progress: BatchProgress = {
    pending: 0,
    processing: 0,
    completed: 0,
    failed: 0,
    total: 0,
  };

  for (const row of results.results as Array<{ status: string; count: number }>) {
    const status = row.status as keyof Omit<BatchProgress, 'total'>;
    progress[status] = row.count;
    progress.total += row.count;
  }

  return progress;
}

API endpoint:

// web/backend/src/routes/cron.ts

app.get('/batch/:batchId', async (c) => {
  const secret = c.req.header('X-Cron-Secret');
  if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
    return c.json({ error: 'Unauthorized' }, 401);
  }

  try {
    const batchId = c.req.param('batchId');
    const progress = await getBatchProgress(c.env, batchId);

    return c.json({
      batchId,
      progress,
      percentage: progress.total > 0 
        ? Math.round(((progress.completed + progress.failed) / progress.total) * 100) 
        : 0,
    });
  } catch (error) {
    console.error('[BatchProgress] Error:', error);
    return c.json({ error: 'Failed to get batch progress' }, 500);
  }
});

Performance Analysis

Before (Sequential Processing)

10 users × 3 seconds = 30 seconds
CPU time: 30 seconds (at limit!)
Wall time: 30 seconds
Success rate: 0% (TLE errors)

After (Distributed Processing)

10 users / 10 Workers = 1 user per Worker
CPU time per Worker: 3 seconds
Wall time: ~10 seconds (parallel)
Success rate: 100%

Scalability

Current load:

  • 10 users/day
  • 10 Workers dispatched
  • ~10 seconds total processing time

Theoretical capacity:

  • Cloudflare Workers: 100,000 requests/day (free tier)
  • D1 writes: 50,000/day (free tier)
  • Bottleneck: D1 writes (2 writes per user = 25,000 users/day)

Headroom: 2500x current load

Cost Analysis

Free tier limits:

  • Cloudflare Workers: 100k req/day
  • D1 database: 50k writes/day
  • Koyeb VPS: 512MB RAM

Current usage:

  • Workers: ~20 req/day (10 users × 2 endpoints)
  • D1 writes: ~40 writes/day (queue + notifications)
  • VPS: ~20MB RAM

Cost: $0/month

Lessons Learned

1. Service Bindings Are Powerful

Each env.SELF.fetch() creates a new Worker instance with fresh CPU budget. This is the key to scaling beyond single-Worker limits.

2. D1 Is Fast Enough for Queues

No need for Redis or SQS. D1 SQLite handles job queues perfectly:

  • Atomic operations with CTE + UPDATE + RETURNING
  • Fast queries (< 10ms)
  • Built-in indexes
  • Free tier generous enough

3. Atomic Operations Prevent Races

Without atomic claiming, multiple Workers would process the same user. CTE + UPDATE + RETURNING in single statement solves this.

4. Idempotency Is Critical

Check status before processing. Safe retries, no duplicate notifications.

5. Stale Item Requeuing Is Essential

Workers crash. Items get stuck. Reaper process requeues them after 10 minutes.

6. Return 200 Even on Failure

If a Worker fails processing one user, return 200 (not 500). Don't block other Workers.

What's Next?

This completes the 3-part series on building Streaky:

  • Part 1: The journey from sequential to distributed processing
  • Part 2: Solving IP blocking with Rust VPS
  • Part 3: Distributed queue system with Service Bindings

Try It Out

Live App: streakyy.vercel.app

GitHub: github.com/0xReLogic/Streaky

Queue Code: web/backend/src/services/queue.ts

Let's Connect

Building distributed systems on Cloudflare? Have questions about Service Bindings or D1? Drop a comment!

GitHub: @0xReLogic

Project: Streaky


This content originally appeared on DEV Community and was authored by Allen Elzayn


Print Share Comment Cite Upload Translate Updates
APA

Allen Elzayn | Sciencx (2025-10-24T17:40:30+00:00) Building Streaky: Distributed Queue System with Service Bindings (Part 3). Retrieved from https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/

MLA
" » Building Streaky: Distributed Queue System with Service Bindings (Part 3)." Allen Elzayn | Sciencx - Friday October 24, 2025, https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/
HARVARD
Allen Elzayn | Sciencx Friday October 24, 2025 » Building Streaky: Distributed Queue System with Service Bindings (Part 3)., viewed ,<https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/>
VANCOUVER
Allen Elzayn | Sciencx - » Building Streaky: Distributed Queue System with Service Bindings (Part 3). [Internet]. [Accessed ]. Available from: https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/
CHICAGO
" » Building Streaky: Distributed Queue System with Service Bindings (Part 3)." Allen Elzayn | Sciencx - Accessed . https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/
IEEE
" » Building Streaky: Distributed Queue System with Service Bindings (Part 3)." Allen Elzayn | Sciencx [Online]. Available: https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/. [Accessed: ]
rf:citation
» Building Streaky: Distributed Queue System with Service Bindings (Part 3) | Allen Elzayn | Sciencx | https://www.scien.cx/2025/10/24/building-streaky-distributed-queue-system-with-service-bindings-part-3/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.