Milestone 6: Email pipeline and worker
Purpose: Inbound email creates or updates tickets and messages; agent reply from UI is sent as email; attachments in S3; spam/bounce/OOO and deduplication handled.
Exit state: Worker fetches per connection type (polling); parses and dedupes; creates tickets/threads; sends replies; stores attachments in S3; edge cases (spam, bounce, OOO) handled.
Spec reference: §2.3 Email Ingestion & Processing, §4.7 Email pipeline, §4.8 Async job architecture, §4.9 Rate limits, §4.10 Ingestion concurrency and idempotency, §5.6 Message, §6.7 Attachment limits, §8.1–8.6 Sync configuration, §10.3 Messages.
Prerequisites: M05 (Inbox management).
6.1 packages/email — clients and send
Tasks
-
IMAP fetch (imapflow)
- Connect using credentials from DB (decrypt in worker); list messages since last UID (imap_last_uid); fetch new messages; parse with mailparser. Return list of parsed messages; update imap_last_uid on inbox after successful process.
- Handle connection errors; set inbox last_error and connected_at = null on auth failure (and enqueue notify-admins job — M08).
-
Gmail API fetch
- Use stored OAuth tokens (decrypt); refresh if expired; list messages (history or messages.list); fetch full message; parse. Store gmail_history_id (or equivalent) on inbox for incremental sync. On token refresh failure, set last_error, clear connected_at, notify admins.
-
Microsoft Graph fetch
- Use stored tokens; refresh if expired; delta or list messages; parse. Store graph_delta_token (or equivalent). On token failure, same as Gmail.
-
MIME parsing
- mailparser: parse headers (Message-ID, In-Reply-To, References, From, To, Subject, Date), body (html + text), attachments. Extract message_id_header for dedupe and thread match; extract bounce/spam/OOO headers per Section 2.3.
-
Send (outbound)
- Build MIME (nodemailer or similar): From (inbox display_name + email_address), To, Subject (include ticket number in subject per Section 5.5), body (html + plain), In-Reply-To, References. For IMAP: send via SMTP using inbox credentials. For Gmail/Graph: send via API. On success update message send_status = 'sent', sent_at = now(). On failure set send_status = 'failed', send_error = message.
-
Credential and token helpers
- Load inbox by id; decrypt credentials_encrypted or token blob; return for use by fetch/send. Centralize in packages/email.
Acceptance criteria
| Criterion | Status |
|---|---|
| IMAP fetch returns new messages; Gmail and Graph fetch return new messages; parsing extracts headers and body. | ✅ |
| Send builds valid MIME and delivers via SMTP or provider API; message row updated. | ✅ |
| Token refresh failure sets inbox error state and clears connected_at; no crash. | ✅ |
6.2 Object storage (S3)
Tasks
-
Upload
- On inbound: for each attachment, upload to S3 key
{tenant_id}/{message_id}/{filename}(or safe filename). Insert attachments row (message_id, tenant_id, file_name, storage_key, content_type, size_bytes). Section 5.21b, 3.14. - On outbound: agent uploads file (via presigned URL or API); worker or API uploads to S3; same key pattern; attach to message before send.
- On inbound: for each attachment, upload to S3 key
-
Download
- Pre-signed URL generation: expiry 15 minutes; no public bucket access. Section 7.5.
- tRPC or API: get attachment by id (with tenant check); return pre-signed URL for download.
-
Limits
- Enforce per Section 6.7: 25 MB per file, 10 files per message, 25 MB total per message. Reject upload or send if over.
Acceptance criteria
| Criterion | Status |
|---|---|
| Inbound attachments stored in S3; attachment rows created; download via pre-signed URL works. | ✅ |
| Outbound attachments uploaded and attached to sent message; limits enforced. | ✅ |
| Bucket private; only pre-signed URLs used for read. | ✅ |
6.3 Inbound pipeline (worker)
Tasks
-
Fetch job (Inngest)
- Function: triggered by cron per inbox (e.g. every 5 min for IMAP; 5 min fallback for Gmail/Graph). Payload: inbox_id, tenant_id. Concurrency: one per inbox (key:
inbox/{inbox_id}). Load inbox; skip if connected_at is null. Fetch new messages (6.1); for each message:- Check spam: X-Spam-Status or provider labels; skip (no ticket). Section 2.3.
- Check bounce: Content-Type message/delivery-status or Auto-Submitted; try link to outbound message (e.g. by In-Reply-To or custom header); set messages.bounce_received_at on that message; skip creating ticket. Section 2.3, 5.6.
- Check OOO/auto-reply: Auto-Submitted, Precedence, X-Auto-Response-Suppress; skip if auto-reply (for later: do not send auto-response to these).
- Dedupe: if message_id_header already exists for this inbox_id, skip (UNIQUE constraint). Section 4.10.
- Thread match: lookup by message_id_header or In-Reply-To/References in messages; if found, get ticket_id; insert new message (inbound), update ticket updated_at; persist attachments to S3 and attachments table; enqueue notifications/webhooks as in M08.
- Else: create new ticket (insert tickets with ticket_number from tenant seq, then insert message); apply auto-response if inbox.auto_response_enabled and not OOO (send job); persist attachments; enqueue notifications/webhooks.
- Transaction: ticket + message + attachments in one transaction where possible; dedupe by ON CONFLICT (inbox_id, message_id_header) DO NOTHING if needed.
- Update inbox sync state: imap_last_uid, gmail_history_id, or graph_delta_token.
- Function: triggered by cron per inbox (e.g. every 5 min for IMAP; 5 min fallback for Gmail/Graph). Payload: inbox_id, tenant_id. Concurrency: one per inbox (key:
-
Cron schedule
- Inngest cron: every 5 minutes (or configurable) emit event per inbox that has connected_at set and connection_type. Worker function subscribes and runs fetch.
-
Rate limits
- Per Section 4.9: max N syncs per hour per inbox (e.g. 60); enforce in Inngest or in function (e.g. check last run time).
Acceptance criteria
| Criterion | Status |
|---|---|
| New email to connected inbox creates ticket with one message; thread reply appends to existing ticket. | ✅ |
| Duplicate message_id is ignored (idempotent). | ✅ |
| Spam and bounce messages do not create tickets; bounce sets bounce_received_at on linked outbound message. | ✅ |
| Attachments stored in S3 and linked in DB; ticket_number assigned correctly. | ✅ |
| Worker publishes to Redis (ticket.updated, ticket.message_added) so M07 SSE clients receive real-time updates when tickets/messages are created or updated. | Pending |
6.4 Outbound pipeline (send reply)
Tasks
-
tRPC mutation (called from M07)
- Input: ticket_id, body_html (and/or body_text), direction outbound, type email; optional attachments (already uploaded or IDs). Create message row with send_status = 'pending'; enqueue Inngest "send email" event with message_id, ticket_id, inbox_id, tenant_id.
-
Send job (Inngest)
- Function: load message, ticket, inbox; build MIME (In-Reply-To, References from thread); add attachments from S3; send via inbox connection (6.1). On success: update message send_status = 'sent', sent_at = now(), ticket updated_at. On failure: send_status = 'failed', send_error = error message; optionally enqueue notification to agent (M08).
- Concurrency: per message (e.g. key:
message/{message_id}) to avoid double-send.
-
Retry
- Agent can retry failed message from UI (same mutation or dedicated "retry send" that re-enqueues job). Section 5.6.
Acceptance criteria
| Criterion | Status |
|---|---|
| Reply from UI creates pending message and sends via worker; message appears in thread as sent. | ✅ |
| Failure sets send_error and allows retry from UI. | ✅ |
| In-Reply-To and References set so email client threads correctly. | ✅ |
| After send success, worker publishes to Redis (ticket.message_added, ticket.updated) so SSE clients see new message in real time. | Pending |
6.5 Auto-response
Tasks
-
Trigger
- After creating new ticket from inbound (in fetch job), if inbox.auto_response_enabled and message not OOO/auto-reply: enqueue "send auto-response" job with ticket_id, inbox_id, tenant_id.
-
Auto-response job
- Load ticket (first message from customer), inbox; resolve template variables (ticket_number, org_name, inbox_name, sender_name) in inbox.auto_response_body; build MIME; send via inbox; insert outbound message with type email, send_status 'sent', label "Auto-response" in UI if needed.
- Idempotent: only one auto-response per ticket (e.g. flag on ticket or check existing outbound count).
Acceptance criteria
| Criterion | Status |
|---|---|
| New ticket from inbound triggers auto-response when enabled; template variables substituted; one per ticket. | ✅ |
| OOO/auto-reply inbound does not trigger auto-response. | ✅ |
6.6 Gmail watch and Graph subscriptions
§8.3 (Gmail Push), §8.4 (Graph change notifications). Implemented for M06.
Tasks
- packages/email
registerGmailWatch(credentials, options)withtopicName; returns{ expiration }(ms). Gmail APIusers/me/watch.createGraphSubscription(credentials, options)withnotificationUrl, optionalclientState; resourceme/mailFolders('Inbox')/messages, 2-day expiry; returns{ subscriptionId, expirationDateTime }.renewGraphSubscription(credentials, subscriptionId, options)PATCH to extend expiry.
- Worker webhooks (
apps/worker/src/webhooks.ts)POST /api/webhooks/gmail-push: parse Pub/Submessage.data(base64 JSON withemailAddress,historyId); find inbox by email +connectionType: 'gmail'; sendinboxops/email.fetch.POST /api/webhooks/graph-push: if?validationTokenreturn it (Graph validation); else parsevalue[], decodeclientStateas{ inbox_id, tenant_id }, dedupe and sendinboxops/email.fetchper inbox; 202 Accepted.
- Worker Inngest (
apps/worker/src/gmail-graph-push.ts)inboxops/gmail.watch.register: load inbox, decrypt creds, callregisterGmailWatch(whenGOOGLE_PUBSUB_TOPICset), updategmail_watch_expires_at; on token error clearconnected_at.inboxops/graph.subscription.create: load inbox, callcreateGraphSubscription(whenGRAPH_PUSH_NOTIFICATION_URLset), storegraph_subscription_idandgraph_subscription_expires_at.- Gmail watch renewal cron (daily): inboxes with
connectionType: 'gmail',gmail_watch_expires_atset and < now+24h; emitgmail.watch.registerper inbox. - Graph subscription renewal cron (every 12h): inboxes with
connectionType: 'microsoft', existing subscription expiring < now+24h; renew via PATCH, update expiry.
- Web OAuth callbacks
- Google callback: after DB update, send
inboxops/gmail.watch.registerwithinbox_id,tenant_id. - Microsoft callback: after DB update, send
inboxops/graph.subscription.createwithinbox_id,tenant_id.
- Google callback: after DB update, send
- Config (
packages/config):GOOGLE_PUBSUB_TOPIC,GRAPH_PUSH_NOTIFICATION_URLoptional. When unset, register/create functions no-op.
Acceptance criteria
| Criterion | Status |
|---|---|
| Gmail watch registered on connect when GOOGLE_PUBSUB_TOPIC set; renewal cron runs daily. | ✅ |
| Graph subscription created on connect when GRAPH_PUSH_NOTIFICATION_URL set; renewal cron every 12h. | ✅ |
| Push webhooks enqueue inboxops/email.fetch; Graph validation returns validationToken. | ✅ |
| Polling-based fetch remains for all types when push not configured or as fallback. | ✅ |
Milestone 6 sign-off
| Criterion | Status |
|---|---|
| All tasks in 6.1–6.5 complete. | ✅ |
| All acceptance criteria met. | ✅ |
| End-to-end: send email to connected inbox → ticket created; reply from UI → email received; attachments work; spam/bounce handled. | ✅ |
| E2E: Tickets page loads (empty state and with seeded ticket); reply and ticket-in-list flows require M07 UI (see INDEX — Testing strategy). | ✅ |
| Ready for M07 (Ticket list and detail). | ✅ |