Skip to main content

Milestone 6: Email pipeline and worker

Purpose: Inbound email creates or updates tickets and messages; agent reply from UI is sent as email; attachments in S3; spam/bounce/OOO and deduplication handled.

Exit state: Worker fetches per connection type (polling); parses and dedupes; creates tickets/threads; sends replies; stores attachments in S3; edge cases (spam, bounce, OOO) handled.

Spec reference: §2.3 Email Ingestion & Processing, §4.7 Email pipeline, §4.8 Async job architecture, §4.9 Rate limits, §4.10 Ingestion concurrency and idempotency, §5.6 Message, §6.7 Attachment limits, §8.1–8.6 Sync configuration, §10.3 Messages.

Prerequisites: M05 (Inbox management).


6.1 packages/email — clients and send

Tasks

  1. IMAP fetch (imapflow)

    • Connect using credentials from DB (decrypt in worker); list messages since last UID (imap_last_uid); fetch new messages; parse with mailparser. Return list of parsed messages; update imap_last_uid on inbox after successful process.
    • Handle connection errors; set inbox last_error and connected_at = null on auth failure (and enqueue notify-admins job — M08).
  2. Gmail API fetch

    • Use stored OAuth tokens (decrypt); refresh if expired; list messages (history or messages.list); fetch full message; parse. Store gmail_history_id (or equivalent) on inbox for incremental sync. On token refresh failure, set last_error, clear connected_at, notify admins.
  3. Microsoft Graph fetch

    • Use stored tokens; refresh if expired; delta or list messages; parse. Store graph_delta_token (or equivalent). On token failure, same as Gmail.
  4. MIME parsing

    • mailparser: parse headers (Message-ID, In-Reply-To, References, From, To, Subject, Date), body (html + text), attachments. Extract message_id_header for dedupe and thread match; extract bounce/spam/OOO headers per Section 2.3.
  5. Send (outbound)

    • Build MIME (nodemailer or similar): From (inbox display_name + email_address), To, Subject (include ticket number in subject per Section 5.5), body (html + plain), In-Reply-To, References. For IMAP: send via SMTP using inbox credentials. For Gmail/Graph: send via API. On success update message send_status = 'sent', sent_at = now(). On failure set send_status = 'failed', send_error = message.
  6. Credential and token helpers

    • Load inbox by id; decrypt credentials_encrypted or token blob; return for use by fetch/send. Centralize in packages/email.

Acceptance criteria

CriterionStatus
IMAP fetch returns new messages; Gmail and Graph fetch return new messages; parsing extracts headers and body.
Send builds valid MIME and delivers via SMTP or provider API; message row updated.
Token refresh failure sets inbox error state and clears connected_at; no crash.

6.2 Object storage (S3)

Tasks

  1. Upload

    • On inbound: for each attachment, upload to S3 key {tenant_id}/{message_id}/{filename} (or safe filename). Insert attachments row (message_id, tenant_id, file_name, storage_key, content_type, size_bytes). Section 5.21b, 3.14.
    • On outbound: agent uploads file (via presigned URL or API); worker or API uploads to S3; same key pattern; attach to message before send.
  2. Download

    • Pre-signed URL generation: expiry 15 minutes; no public bucket access. Section 7.5.
    • tRPC or API: get attachment by id (with tenant check); return pre-signed URL for download.
  3. Limits

    • Enforce per Section 6.7: 25 MB per file, 10 files per message, 25 MB total per message. Reject upload or send if over.

Acceptance criteria

CriterionStatus
Inbound attachments stored in S3; attachment rows created; download via pre-signed URL works.
Outbound attachments uploaded and attached to sent message; limits enforced.
Bucket private; only pre-signed URLs used for read.

6.3 Inbound pipeline (worker)

Tasks

  1. Fetch job (Inngest)

    • Function: triggered by cron per inbox (e.g. every 5 min for IMAP; 5 min fallback for Gmail/Graph). Payload: inbox_id, tenant_id. Concurrency: one per inbox (key: inbox/{inbox_id}). Load inbox; skip if connected_at is null. Fetch new messages (6.1); for each message:
      • Check spam: X-Spam-Status or provider labels; skip (no ticket). Section 2.3.
      • Check bounce: Content-Type message/delivery-status or Auto-Submitted; try link to outbound message (e.g. by In-Reply-To or custom header); set messages.bounce_received_at on that message; skip creating ticket. Section 2.3, 5.6.
      • Check OOO/auto-reply: Auto-Submitted, Precedence, X-Auto-Response-Suppress; skip if auto-reply (for later: do not send auto-response to these).
      • Dedupe: if message_id_header already exists for this inbox_id, skip (UNIQUE constraint). Section 4.10.
      • Thread match: lookup by message_id_header or In-Reply-To/References in messages; if found, get ticket_id; insert new message (inbound), update ticket updated_at; persist attachments to S3 and attachments table; enqueue notifications/webhooks as in M08.
      • Else: create new ticket (insert tickets with ticket_number from tenant seq, then insert message); apply auto-response if inbox.auto_response_enabled and not OOO (send job); persist attachments; enqueue notifications/webhooks.
    • Transaction: ticket + message + attachments in one transaction where possible; dedupe by ON CONFLICT (inbox_id, message_id_header) DO NOTHING if needed.
    • Update inbox sync state: imap_last_uid, gmail_history_id, or graph_delta_token.
  2. Cron schedule

    • Inngest cron: every 5 minutes (or configurable) emit event per inbox that has connected_at set and connection_type. Worker function subscribes and runs fetch.
  3. Rate limits

    • Per Section 4.9: max N syncs per hour per inbox (e.g. 60); enforce in Inngest or in function (e.g. check last run time).

Acceptance criteria

CriterionStatus
New email to connected inbox creates ticket with one message; thread reply appends to existing ticket.
Duplicate message_id is ignored (idempotent).
Spam and bounce messages do not create tickets; bounce sets bounce_received_at on linked outbound message.
Attachments stored in S3 and linked in DB; ticket_number assigned correctly.
Worker publishes to Redis (ticket.updated, ticket.message_added) so M07 SSE clients receive real-time updates when tickets/messages are created or updated.Pending

6.4 Outbound pipeline (send reply)

Tasks

  1. tRPC mutation (called from M07)

    • Input: ticket_id, body_html (and/or body_text), direction outbound, type email; optional attachments (already uploaded or IDs). Create message row with send_status = 'pending'; enqueue Inngest "send email" event with message_id, ticket_id, inbox_id, tenant_id.
  2. Send job (Inngest)

    • Function: load message, ticket, inbox; build MIME (In-Reply-To, References from thread); add attachments from S3; send via inbox connection (6.1). On success: update message send_status = 'sent', sent_at = now(), ticket updated_at. On failure: send_status = 'failed', send_error = error message; optionally enqueue notification to agent (M08).
    • Concurrency: per message (e.g. key: message/{message_id}) to avoid double-send.
  3. Retry

    • Agent can retry failed message from UI (same mutation or dedicated "retry send" that re-enqueues job). Section 5.6.

Acceptance criteria

CriterionStatus
Reply from UI creates pending message and sends via worker; message appears in thread as sent.
Failure sets send_error and allows retry from UI.
In-Reply-To and References set so email client threads correctly.
After send success, worker publishes to Redis (ticket.message_added, ticket.updated) so SSE clients see new message in real time.Pending

6.5 Auto-response

Tasks

  1. Trigger

    • After creating new ticket from inbound (in fetch job), if inbox.auto_response_enabled and message not OOO/auto-reply: enqueue "send auto-response" job with ticket_id, inbox_id, tenant_id.
  2. Auto-response job

    • Load ticket (first message from customer), inbox; resolve template variables (ticket_number, org_name, inbox_name, sender_name) in inbox.auto_response_body; build MIME; send via inbox; insert outbound message with type email, send_status 'sent', label "Auto-response" in UI if needed.
    • Idempotent: only one auto-response per ticket (e.g. flag on ticket or check existing outbound count).

Acceptance criteria

CriterionStatus
New ticket from inbound triggers auto-response when enabled; template variables substituted; one per ticket.
OOO/auto-reply inbound does not trigger auto-response.

6.6 Gmail watch and Graph subscriptions

§8.3 (Gmail Push), §8.4 (Graph change notifications). Implemented for M06.

Tasks

  1. packages/email
    • registerGmailWatch(credentials, options) with topicName; returns { expiration } (ms). Gmail API users/me/watch.
    • createGraphSubscription(credentials, options) with notificationUrl, optional clientState; resource me/mailFolders('Inbox')/messages, 2-day expiry; returns { subscriptionId, expirationDateTime }.
    • renewGraphSubscription(credentials, subscriptionId, options) PATCH to extend expiry.
  2. Worker webhooks (apps/worker/src/webhooks.ts)
    • POST /api/webhooks/gmail-push: parse Pub/Sub message.data (base64 JSON with emailAddress, historyId); find inbox by email + connectionType: 'gmail'; send inboxops/email.fetch.
    • POST /api/webhooks/graph-push: if ?validationToken return it (Graph validation); else parse value[], decode clientState as { inbox_id, tenant_id }, dedupe and send inboxops/email.fetch per inbox; 202 Accepted.
  3. Worker Inngest (apps/worker/src/gmail-graph-push.ts)
    • inboxops/gmail.watch.register: load inbox, decrypt creds, call registerGmailWatch (when GOOGLE_PUBSUB_TOPIC set), update gmail_watch_expires_at; on token error clear connected_at.
    • inboxops/graph.subscription.create: load inbox, call createGraphSubscription (when GRAPH_PUSH_NOTIFICATION_URL set), store graph_subscription_id and graph_subscription_expires_at.
    • Gmail watch renewal cron (daily): inboxes with connectionType: 'gmail', gmail_watch_expires_at set and < now+24h; emit gmail.watch.register per inbox.
    • Graph subscription renewal cron (every 12h): inboxes with connectionType: 'microsoft', existing subscription expiring < now+24h; renew via PATCH, update expiry.
  4. Web OAuth callbacks
    • Google callback: after DB update, send inboxops/gmail.watch.register with inbox_id, tenant_id.
    • Microsoft callback: after DB update, send inboxops/graph.subscription.create with inbox_id, tenant_id.
  5. Config (packages/config): GOOGLE_PUBSUB_TOPIC, GRAPH_PUSH_NOTIFICATION_URL optional. When unset, register/create functions no-op.

Acceptance criteria

CriterionStatus
Gmail watch registered on connect when GOOGLE_PUBSUB_TOPIC set; renewal cron runs daily.
Graph subscription created on connect when GRAPH_PUSH_NOTIFICATION_URL set; renewal cron every 12h.
Push webhooks enqueue inboxops/email.fetch; Graph validation returns validationToken.
Polling-based fetch remains for all types when push not configured or as fallback.

Milestone 6 sign-off

CriterionStatus
All tasks in 6.1–6.5 complete.
All acceptance criteria met.
End-to-end: send email to connected inbox → ticket created; reply from UI → email received; attachments work; spam/bounce handled.
E2E: Tickets page loads (empty state and with seeded ticket); reply and ticket-in-list flows require M07 UI (see INDEX — Testing strategy).
Ready for M07 (Ticket list and detail).