Build an Email-to-Agent Pipeline: From Gmail to Auto-Response in 7 Steps
TL;DR
- Seven-stage pipeline turns raw Gmail messages into agent-drafted, human-approved replies — no manual routing required.
- LLM-based intent classification replaces brittle subject-line rules, with a confidence threshold that routes uncertain messages to triage.
- Every outbound draft passes through an approval queue, giving you full control while your cyborgenic organization handles the volume.
Your inbox is the hardest integration test an AI agent will ever face. Zero schema. Wildly varying intent. The implicit expectation that someone will respond quickly and correctly. If your agents can handle email, they can handle just about anything — and in a cyborgenic organization with 11 agent roles, that capability is table stakes.
In a Cyborgenic Organization, email is the boundary where the outside world meets your agent fleet. Handling it well -- with accurate classification, appropriate routing, and human oversight -- proves that AI agents can own real communication channels, not just internal tooling.
I shipped an email-to-agent pipeline at GenBrain that reads Gmail, classifies intent, routes messages to the right agent, drafts a response, and queues it for human approval before sending. The email configuration lives right in agent_hub_mcp.py — here is the actual production config:
# From conductor/src/mcp_servers/agent_hub_mcp.py
# Email configuration
GMAIL_CREDENTIALS_PATH = os.environ.get("GMAIL_CREDENTIALS_PATH", "/agent-data/gmail/credentials.json")
GMAIL_TOKEN_PATH = os.environ.get("GMAIL_TOKEN_PATH", "/agent-data/gmail/token.json")
GMAIL_OAUTH_ENABLED = os.path.exists(GMAIL_TOKEN_PATH)
# SMTP fallback (for sending real emails)
SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD", "") # App password (not regular password)
EMAIL_ENABLED = GMAIL_OAUTH_ENABLED or os.environ.get("EMAIL_ENABLED", "false").lower() == "true"
OAuth is preferred, SMTP is the fallback. This dual-path approach means email keeps working even when OAuth tokens expire at 3 AM. Here is exactly how I built it.
The Architecture: 7 Steps from Inbox to Reply
Rendering diagram…
Here is the full flow:
Gmail → Google Workspace MCP → Intent Classifier → NATS Router → Agent → Draft Response → Approval Queue → Human Review → Send
Every email passes through seven distinct stages. No stage is optional. Even auto-acknowledged messages still flow through classification so we have a record of what came in and what category it fell into.
Let's break each one down.
Step 1: Google Workspace MCP Integration
Email ingestion runs through the Google Workspace MCP integration — an OAuth-based connection that receives messages via push notification, not polling. When a new message arrives, Google pushes it to our MCP server, which deduplicates and publishes the raw payload into the pipeline.
The dedup is critical. I learned this the hard way when a Gmail API retry caused the same email to get processed three times, generating three different agent responses. Now every message ID gets checked against a seen-collection before processing:
@mcp_tool("gmail_on_message")
async def handle_inbound(message: GmailMessage, seen_collection):
msg_id = message.id
if await seen_collection.document(msg_id).get().exists:
return
await publish_to_nats("email.inbound.raw", message.to_json())
await seen_collection.document(msg_id).set({"processed_at": now()})
The key design decision: the MCP integration does nothing except receive and publish. It doesn't parse, classify, or respond. Single responsibility keeps it reliable. And because it's push-based via OAuth, there's no polling interval to tune — messages arrive in near real-time. The inbox_listener.py uses the same pattern — a ProcessedEventTracker with a 10,000-event dedup cache and 300-second TTL to prevent duplicate processing on JetStream redelivery.
Step 2: The Intent Classifier
Rendering diagram…
This is where it gets interesting. Traditional email routing uses rules: if the subject contains "pricing," send it to sales. That breaks immediately. People write "quick question" in the subject and ask about enterprise licensing in the body.
Instead, we run every email through an LLM-based intent classifier that outputs one of four categories:
- sales_inquiry — pricing questions, demo requests, "how much does it cost"
- support — bug reports, access issues, "it's not working"
- partnership — integration proposals, co-marketing, "let's work together"
- internal — messages from team members, automated alerts, system notifications
INTENT_PROMPT = """Classify this email into exactly one category:
sales_inquiry, support, partnership, internal.
Return JSON: {"intent": "<category>", "confidence": <0-1>}
Subject: {subject}
Body: {body}
Sender: {sender}
"""
async def classify_intent(email: ParsedEmail) -> IntentResult:
response = await claude.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=128,
messages=[{
"role": "user",
"content": INTENT_PROMPT.format(
subject=email.subject,
body=email.body[:2000],
sender=email.sender
)
}]
)
return IntentResult.parse(response.content[0].text)
We truncate the body at 2,000 characters. Intent classification doesn't need the full thread — the signal is almost always in the first few paragraphs. This also keeps token costs predictable.
The classifier returns a confidence score. Anything below 0.7 gets routed to a general triage queue instead of directly to an agent.
Step 3: NATS Routing
Once we have an intent, we publish to an intent-specific NATS subject. This is the same NATS JetStream on port 4222 that handles all inter-agent communication — task assignments, discovery announcements, meeting scheduling, everything. Email is just another message type on the bus:
INTENT_ROUTES = {
"sales_inquiry": "email.classified.sales",
"support": "email.classified.support",
"partnership": "email.classified.partnership",
"internal": "email.classified.internal",
}
async def route_email(email: ParsedEmail, intent: IntentResult):
subject = INTENT_ROUTES.get(intent.category, "email.classified.triage")
await nats.publish(subject, email.to_json())
Each of the 11 agent roles subscribes to the NATS subjects it handles. The Marketing agent listens to email.classified.sales. DevOps and Fullstack listen to email.classified.support. The CEO and CSO handle partnerships. Here's how those subscriptions get wired up in inbox_listener.py:
# From conductor/src/mcp_servers/inbox_listener.py
# Subscribe to role-based topics based on skills
for skill in SKILLS:
if skill.strip():
topic = f"agent-hub.topics.tasks.{skill.strip()}"
await self._subscribe(topic, self._handle_topic_message)
logger.info("Subscribed to skill topic: %s", topic)
Adding a new category means adding a new NATS subject and a subscriber — no changes to the router itself.
Step 4: Auto-Ack — Never Leave a Sender Waiting
Before the assigned agent even starts drafting a real response, we fire an auto-acknowledgment:
async def send_auto_ack(email: ParsedEmail, intent: IntentResult):
template = ACK_TEMPLATES[intent.category]
await gmail_send(
to=email.sender,
subject=f"Re: {email.subject}",
body=template.format(sender_name=email.sender_name),
thread_id=email.thread_id,
)
The ack templates are intentionally short and honest:
Hi , we received your message and a team member will follow up shortly. If this is urgent, reply to this email with "URGENT" in the subject.
This one small thing changes the sender's experience completely. They know their email didn't go into a void.
Step 5: Agent Drafts a Response
The subscribed agent receives the classified email and drafts a response using context from its own knowledge base. For sales inquiries, that means current pricing, product capabilities, and recent case studies. For support, it pulls from known issues and documentation.
The agent writes a draft. It does not send it.
Step 6: The Approval Queue
Every draft goes into a Firestore-backed approval queue. We built a FastAPI service with five endpoints:
GET /api/v1/drafts — list all pending drafts
GET /api/v1/drafts/{draft_id} — view a specific draft with full context
POST /api/v1/drafts/{draft_id}/approve — approve (optionally edit before sending)
POST /api/v1/drafts/{draft_id}/reject — reject with a reason
PUT /api/v1/drafts/{draft_id}/edit — edit the draft body
Auth is handled by a user_id allowlist. Only approved admins can approve or reject drafts.
When a reviewer approves a draft, they can optionally edit the body first. Rejections require a reason. That reason gets fed back to the agent as training signal for future drafts.
Testing: 57 Tests and What They Cover
I wrote 57 tests across the pipeline — part of the 83,163 test functions in the broader codebase across 2,304 test files:
- Poller tests: deduplication logic, Gmail API error handling, NATS publish failures
- Classifier tests: each intent category with representative emails, edge cases (empty body, non-English subject, extremely long threads), confidence thresholds
- Router tests: correct NATS subject mapping, fallback to triage for unknown intents
- Approval API tests: all five endpoints, auth enforcement (valid admin, invalid admin, missing header), approval with and without edits, rejection with reason validation
- Integration tests: full flow from raw email to queued draft
The classifier tests are the most valuable. I built a fixture set of 20 real-ish emails and verified that the classifier gets at least 18 out of 20 correct. When I update the prompt, I run that fixture set first. This is the same testing discipline that produced 646 commits in May 2026 alone — nothing ships without passing tests.
What to Approve Automatically vs. What Needs Review
After running this for a week, here's what I learned — sometimes from painful experience:
Safe to auto-approve (after you've built confidence):
- Auto-ack messages (they're templated, no agent creativity involved)
- Internal routing confirmations
- Support responses that link to existing documentation without adding commentary
Always require human review:
- Anything mentioning pricing, contracts, or commitments — I once caught a draft that quoted a price we had changed two days earlier
- Partnership responses (the stakes are too high for a wrong tone)
- Any response where the agent's confidence is below 0.85
- First response to a new sender (you don't get a second chance at a first impression)
Start with everything requiring approval. Relax constraints as you build trust in the system. I still approve every outbound email, and I am not in a rush to change that. With 11 agents running across the organization, the volume is manageable — and the cost of a bad email is higher than the cost of a review queue.
What's Next
This pipeline handles inbound email and drafts responses. I am already working on Phase 3: multi-turn thread handling, where the agent maintains conversation context across a full email thread and knows when to escalate versus when to keep going. The email_founder tool in the MCP server already handles the founder-escalation path — next up is teaching agents when to use it automatically.
Build your own cyborgenic organization at agent.ceo. I run this pipeline in production today — every email to my team hits this exact flow. The same infrastructure that powers 9,799 commits, 83,163 tests, and 11 autonomous agents powers the email pipeline too.