Messaging SDK (Python)

Connect your Python agent to Slack, web chat, and other platforms over gRPC
View as Markdown

The messaging SDK is how your agent container talks to the messaging sidecar that ships next to it. When you declare agent.interfaces.messaging: true in astropods.yml, the platform deploys a messaging container alongside your agent. That sidecar runs platform adapters (Slack, web chat, etc.), normalises every incoming event into a single protobuf shape, and routes it to your agent over a bidirectional gRPC stream.

This page covers the Python SDK, astropods-messaging. For TypeScript/Node, see Messaging SDK (Node).

This is a low-level package — raw generated gRPC stubs. If you’re building an agent on a supported framework, prefer a higher-level adapter (e.g. astropods-adapter-langchain). Reach for this SDK directly when implementing a custom adapter or when no framework adapter exists.

Field names below are snake_case — that’s how Python protobuf bindings expose proto fields. Requires Python 3.10+.

Install

$pip install astropods-messaging

Connect to the sidecar

The sidecar listens on gRPC port 9090. Inside the same pod the address is always localhost:9090; locally with ast dev it’s the same.

1import grpc
2from astropods_messaging import AgentMessagingStub
3
4channel = grpc.insecure_channel("localhost:9090")
5stub = AgentMessagingStub(channel)

The sidecar talks plaintext gRPC on the loopback interface — no TLS, no auth. The trust boundary is the pod itself.

The conversation stream

The primary RPC is ProcessConversation — a bidirectional stream. The sidecar pushes incoming user messages, feedback, and audio. Your agent pushes back status updates, content chunks, errors, and other responses on the same stream.

Bidi streaming in grpcio is symmetric: pass a generator of ConversationRequests and iterate the returned generator of AgentResponses. A common pattern uses a queue.Queue to push outbound messages from anywhere in the program.

1import queue, grpc
2from astropods_messaging import (
3 AgentMessagingStub, ConversationRequest, AgentResponse, ContentChunk,
4)
5
6channel = grpc.insecure_channel("localhost:9090")
7stub = AgentMessagingStub(channel)
8
9outbound: queue.Queue[ConversationRequest] = queue.Queue()
10
11def requests():
12 while True:
13 yield outbound.get()
14
15for resp in stub.ProcessConversation(requests()):
16 if resp.HasField("incoming_message"):
17 m = resp.incoming_message
18 reply = AgentResponse(
19 conversation_id=m.conversation_id,
20 content=ContentChunk(
21 type=ContentChunk.END,
22 content=f"you said: {m.content}",
23 ),
24 )
25 outbound.put(ConversationRequest(agent_response=reply))

Inbound payloads — switch on resp.WhichOneof("payload") and read the matching field. Use resp.HasField("incoming_message") for individual checks.

AgentResponse payload variants (server → agent)

WhichOneof("payload")TypeNotes
incoming_messageMessageThe inbound message itself.
feedbackPlatformFeedbackUser feedback event.
audio_configAudioStreamConfigAudio session start.
audio_chunkAudioChunkRaw audio bytes.
statusStatusUpdateRelayed echoes (rare).
contentContentChunkRelayed echoes (rare).
Any of the outbound variants below.

Outbound (ConversationRequest) variants (agent → sidecar)

FieldTypeNotes
messageMessageForward an agent-originated message.
feedbackPlatformFeedbackRelay or fabricate feedback.
agent_configAgentConfigAnnounce capabilities on startup.
agent_responseAgentResponseMain path. Any agent response.
audio_configAudioStreamConfigUpstream audio session start.
audioAudioChunkUpstream audio bytes.

Inbound message anatomy

An incoming message is a Message. The same shape applies whether it came from Slack, the web chat, or any other adapter.

Message

FieldTypeRequiredNotes
idstringyesUUID assigned by the sidecar.
timestampgoogle.protobuf.TimestampyesWhen the platform received the message.
platformstringyes"slack", "web", "discord", etc.
platform_contextPlatformContextyesPlatform-native IDs and event metadata.
userUseryesSender identity.
contentstringyesCleaned text. Adapters strip the bot’s @-mention before forwarding.
attachmentsrepeated AttachmentnoFiles, images, video, audio, link previews.
conversation_idstringyesStable correlation ID across the message lifecycle. Always echo back on responses.

User

FieldTypeRequiredNotes
idstringyesPlatform-specific user ID.
usernamestringnoDisplay name or handle.
avatar_urlstringnoAvatar URL.
emailstringnoEmail if available.
user_datamap<string, string>noPlatform-specific extras (workspace, role, etc.).

Attachment

FieldTypeRequiredNotes
typeenumyesSee Attachment.Type below.
urlstringyesAuthenticated direct-download URL.
filenamestringnoOriginal filename.
size_bytesint64noFile size in bytes.
mime_typestringnoMIME type.
titlestringnoDisplay title (rich attachments).
descriptionstringnoDisplay description.
widthint32noFor images/videos.
heightint32noFor images/videos.

Attachment.Type: TYPE_UNSPECIFIED (0), IMAGE (1), FILE (2), VIDEO (3), AUDIO (4), LINK (5). Access via Attachment.IMAGE etc.

PlatformContext

FieldTypeRequiredNotes
message_idstringyesOriginal platform message ID.
channel_idstringyesChannel/room/chat ID.
thread_idstringnoAgent’s reply target. Also set on top-level messages whose response should open a new thread.
thread_root_idstringnoParent thread root timestamp. Set only when this message is a reply inside an existing thread.
channel_namestringnoDisplay channel name.
workspace_idstringnoSlack workspace, Discord guild, etc.
bot_user_idstringnoThe bot’s own user ID in the source platform.
user_idstringnoRaw platform-native sender ID before any cross-platform identity resolution.
event_kindenumyesSee PlatformContext.EventKind below.
platform_datamap<string, string>noPlatform-specific extras (Slack ts, Discord snowflake, Teams activity ID).

PlatformContext.EventKind (access via PlatformContext.EVENT_KIND_DM etc.)

ValueWhen the adapter emits it
EVENT_KIND_UNSPECIFIED (0)Fallback. Should not appear in production traffic.
EVENT_KIND_DM (1)1:1 / private chat (Slack DM, web chat session).
EVENT_KIND_APP_MENTION (2)Bot was @-mentioned in a channel or thread.
EVENT_KIND_THREAD_REPLY (3)Reply inside an existing thread, no @-mention.
EVENT_KIND_OBSERVED (4)Observe-channel forward (listen-only).
EVENT_KIND_REACTION (5)Reaction added/removed.
EVENT_KIND_BUTTON_CLICK (6)Interactive button click on a CardAttachment.
EVENT_KIND_SLASH_COMMAND (7)Slash command (Slack/Discord).
EVENT_KIND_ASSISTANT_THREAD_STARTED (8)Slack assistant thread opened.

Sending a response

AgentResponse

FieldTypeRequiredNotes
conversation_idstringyesMust match the inbound Message.conversation_id.
response_idstringnoStable ID for this response. Used by feedback events.
one variantyesExactly one payload variant below.

AgentResponse payload variants (oneof payload)

VariantTypeNotes
incoming_messageMessageServer → agent only. The inbound message itself.
statusStatusUpdatePre-content typing indicator.
contentContentChunkActual message text, streamed.
promptsSuggestedPromptsQuick-reply suggestions.
thread_metadataThreadMetadataOpen a thread or update its title.
transcriptTranscriptSTT result back to the platform (audio flow).
errorErrorResponseSurface an error to the user.
context_requestThreadHistoryRequestAsk the sidecar to hydrate thread history.
audio_configAudioStreamConfigServer → agent only.
audio_chunkAudioChunkServer → agent only.
feedbackPlatformFeedbackServer → agent only.

Streaming text content

ContentChunk

FieldTypeRequiredNotes
typeenumyesSee ContentChunk.ChunkType below.
contentstringnoSemantics depend on type.
attachmentsrepeated ResponseAttachmentnoShip with END chunks (or standalone).
platform_message_idstringnoReturned by the adapter after START; pass on later chunks to update it.
optionsMessageOptionsnoCreation flags.

ContentChunk.ChunkType (access via ContentChunk.START etc.)

ValueUse
START (1)Create the platform message. May be empty (immediate presence) or include initial content.
DELTA (2)Append the next token(s). Stream as many as you want.
END (3)Finalize. Last content (optional) and any attachments ship here.
REPLACE (4)Overwrite the full content — for post-stream edits.

MessageOptions

FieldTypeRequiredNotes
ephemeralboolnoOnly visible to the recipient user.
create_threadboolnoStart a new thread under the user’s message.
reply_to_message_idstringnoReply to a specific message.
silentboolnoSuppress notification.

ResponseAttachment (oneof attachment_type)

VariantTypeFields
imageImageAttachmenturl, alt_text, title, width, height
fileFileAttachmenturl, filename, mime_type, size_bytes
cardCardAttachmentplatform_card_json — Slack Block Kit, Discord Embeds, Teams cards
linkLinkPreviewurl, title, description, image_url

Example:

1def send(payload):
2 outbound.put(ConversationRequest(agent_response=AgentResponse(
3 conversation_id=cid,
4 content=payload,
5 )))
6
7send(ContentChunk(type=ContentChunk.START, content=""))
8for token in stream_llm(prompt):
9 send(ContentChunk(type=ContentChunk.DELTA, content=token))
10send(ContentChunk(type=ContentChunk.END, content=""))

Status updates

StatusUpdate

FieldTypeRequiredNotes
statusenumyesSee StatusUpdate.Status below.
custom_messagestringnoRequired with CUSTOM; otherwise overrides default phrasing.
emojistringnoPlatform emoji, e.g. :mag:.

StatusUpdate.Status (access via StatusUpdate.THINKING etc.)

ValueMeaning
STATUS_UNSPECIFIED (0)Do not use.
THINKING (1)Generic “thinking”.
SEARCHING (2)RAG/knowledge base search.
GENERATING (3)LLM generation in progress.
PROCESSING (4)Tool execution.
ANALYZING (5)Data analysis.
CUSTOM (10)Use with custom_message.
1from astropods_messaging import StatusUpdate
2
3send_response(AgentResponse(
4 conversation_id=cid,
5 status=StatusUpdate(status=StatusUpdate.SEARCHING),
6))
7send_response(AgentResponse(
8 conversation_id=cid,
9 status=StatusUpdate(
10 status=StatusUpdate.CUSTOM,
11 custom_message="Querying the knowledge base…",
12 emoji=":mag:",
13 ),
14))

Suggested prompts

SuggestedPrompts

FieldTypeRequiredNotes
promptsrepeated SuggestedPrompts.PromptyesMax 4–6 depending on platform.

SuggestedPrompts.Prompt

FieldTypeRequiredNotes
idstringyesUnique ID. Echoed back in PlatformFeedback.prompt_selection.
titlestringyesButton/chip label.
messagestringyesFull message sent on click.
descriptionstringnoTooltip/help text.

Errors

ErrorResponse

FieldTypeRequiredNotes
codeenumyesSee ErrorResponse.ErrorCode below.
messagestringyesUser-facing error message.
detailsstringnoTechnical details. Logged, not shown to the user.
retryableboolnoWhether the platform should offer a retry affordance.

ErrorResponse.ErrorCode

ValueMeaning
ERROR_CODE_UNSPECIFIED (0)Fallback. Avoid.
RATE_LIMIT (1)Agent hit rate limit.
CONTEXT_TOO_LONG (2)Context exceeds LLM limit.
INVALID_REQUEST (3)Malformed request.
AGENT_ERROR (4)Internal agent error.
TOOL_ERROR (5)Tool execution failed.
PLATFORM_ERROR (6)Platform API error.

Thread metadata

ThreadMetadata

FieldTypeRequiredNotes
thread_idstringnoPlatform thread ID. Set to update an existing thread.
titlestringnoThread title/subject.
create_newboolnoCreate a new thread.

Transcript

Sent after STT to replace the “[audio]” placeholder on the platform.

Transcript

FieldTypeRequiredNotes
textstringyesTranscribed text.
message_idstringnoPlaceholder message ID to update.
languagestringnoBCP-47 detected language (e.g. en-US).

Receiving platform feedback

The sidecar sends PlatformFeedback on the same stream when the user interacts with a previous response.

PlatformFeedback

FieldTypeRequiredNotes
conversation_idstringyesConversation this feedback belongs to.
response_idstringnoWhich agent response this feedback relates to.
timestampgoogle.protobuf.TimestampyesWhen the feedback occurred.
userUsernoPlatform user who submitted the feedback. Empty for anonymous/system events.
one variantyesOne of the fields below.

PlatformFeedback variants (oneof feedback)

VariantTypeNotes
reactionMessageReactionThumbs up/down or custom emoji.
prompt_selectionPromptSelectionUser clicked a SuggestedPrompts entry.
button_clickButtonClickUser clicked a button on a CardAttachment.
stream_controlStreamControlUser asked to stop, pause, resume, or regenerate.
message_editMessageEditUser edited their own previous message.
message_deleteMessageDeleteUser deleted their own previous message.
textTextFeedbackFree-form text from a platform-native modal.

MessageReaction

FieldTypeRequiredNotes
typeenumyesREACTION_TYPE_UNSPECIFIED, THUMBS_UP, THUMBS_DOWN, CUSTOM_EMOJI.
emojistringnoPopulated when type == CUSTOM_EMOJI.
addedboolyesTrue = added, False = removed.

PromptSelection

FieldTypeNotes
prompt_idstringMatches SuggestedPrompts.Prompt.id.
prompt_messagestringFull message being sent.

ButtonClick

FieldTypeNotes
button_idstringButton identifier from card.
valuestringButton value/payload.
actionstringAction identifier.

StreamControl

FieldTypeNotes
actionenumACTION_UNSPECIFIED, STOP, PAUSE, RESUME, REGENERATE.
reasonstringWhy (user click, error, etc.).

MessageEdit

FieldTypeNotes
message_idstringPlatform message ID that was edited.
new_contentstringNew content after edit.
original_contentstringOriginal content (if available).
edited_atgoogle.protobuf.TimestampWhen the edit happened.

MessageDelete

FieldTypeNotes
message_idstringPlatform message ID that was deleted.
deleted_atgoogle.protobuf.TimestampWhen the delete happened.

TextFeedback

FieldTypeNotes
textstringFree-form text the user typed.
promptstringLabel/title shown above the textbox.
1for resp in stub.ProcessConversation(requests()):
2 if resp.HasField("feedback"):
3 fb = resp.feedback
4 if fb.HasField("reaction"):
5 log_feedback(fb.response_id, fb.reaction.type, fb.reaction.added)
6 elif fb.HasField("stream_control"):
7 cancel_generation(fb.conversation_id)

Audio

Audio flows agent-side as raw bytes — the messaging system does no STT, transcoding, or VAD.

StepDirectionMessageNotes
1Sidecar → agentAgentResponse.audio_configFormat (encoding, sample rate, channels).
2Sidecar → agentAgentResponse.audio_chunkRaw bytes. done=True marks end-of-utterance.
3Agent → sidecarAgentResponse.transcriptSTT result; platform replaces the placeholder.

AudioStreamConfig

FieldTypeRequiredNotes
encodingenumyesOne of the AudioEncoding values below.
sample_rateint32yesHz: 8000 (telephony), 16000 (speech), 48000 (browser).
channelsint32yes1 = mono (speech default), 2 = stereo.
languagestringnoBCP-47 hint for STT (e.g. en-US).
conversation_idstringyesLinks audio to a conversation.
sourcestringnoOrigin: browser, twilio, vonage, mobile, upload.
user_idstringnoSpeaking user’s identity.

AudioChunk

FieldTypeRequiredNotes
databytesyesRaw audio bytes. Empty when done=True.
sequenceint64noMonotonic ordering counter.
doneboolnoTrue = end of segment, run STT now.

AudioEncoding (access via AudioEncoding.LINEAR16 etc.)

ValueUse
AUDIO_ENCODING_UNSPECIFIED (0)Do not use.
LINEAR16 (1)PCM signed 16-bit LE. Universal baseline.
MULAW (2)G.711 mu-law. Twilio / telephony (8 kHz).
OPUS (3)Raw Opus frames. Low-latency codec.
MP3 (4)MP3. Batch uploads, pre-recorded.
WEBM_OPUS (5)WebM/Opus. Browser MediaRecorder default.
OGG_OPUS (6)OGG/Opus. Firefox MediaRecorder.
FLAC (7)FLAC lossless. High-quality uploads.
AAC (8)AAC. iOS native recording.

Auxiliary RPCs

RPCWhen to use
ProcessMessageServer-streaming for one-shot request/response. Same AgentResponse shape, no inbound feedback.
GetThreadHistoryPull current thread state (handles edits/deletions).
GetConversationMetadataLook up a conversation by ID or by (platform, channel, thread) without fetching history.
ProcessAudioStreamDedicated audio-only stream. First message must be AudioStreamConfig, rest are AudioChunks.
HealthCheckReturns HEALTHY / DEGRADED / UNHEALTHY plus the sidecar version.

ThreadHistoryRequest

FieldTypeRequiredDefaultNotes
conversation_idstringyesConversation to query.
max_messagesint32no50How many recent messages to return.
include_editedboolnotrueInclude edit history.
include_deletedboolnofalseInclude deleted markers.

ThreadHistoryResponse

FieldTypeNotes
conversation_idstringEchoes the request.
messagesrepeated ThreadMessageRecent messages.
is_completeboolFalse if truncated by max_messages.
fetched_atgoogle.protobuf.TimestampWhen the snapshot was taken.

ThreadMessage

FieldTypeNotes
message_idstringPlatform message ID.
userUserAuthor.
contentstringCurrent content (after edits).
attachmentsrepeated AttachmentAttachments on the message.
timestampgoogle.protobuf.TimestampWhen it was sent.
was_editedboolTrue if the message has been edited.
original_contentstringContent before edits.
edited_atgoogle.protobuf.TimestampLast edit time.
is_deletedboolTrue if the message has been deleted.
deleted_atgoogle.protobuf.TimestampDeletion time.
platform_datamap<string, string>Platform-specific extras.
1from astropods_messaging.astro.messaging.v1.service_pb2 import (
2 ThreadHistoryRequest,
3)
4history = stub.GetThreadHistory(ThreadHistoryRequest(
5 conversation_id=cid, max_messages=50,
6))
7for m in history.messages:
8 ... # m.content, m.was_edited, m.is_deleted, m.original_content

Reconnection

The Python SDK is the raw generated stub — there’s no built-in retry. Implement reconnect with your usual gRPC retry policy. Two common shapes:

  1. Channel-level retry via the grpc.service_config JSON on the channel, applied to all RPCs.
  2. Wrapper loop that re-establishes the stream on grpc.RpcError with StatusCode.UNAVAILABLE / DEADLINE_EXCEEDED / INTERNAL / RESOURCE_EXHAUSTED, with exponential backoff.
1import time, random, grpc
2
3def stream_with_retry(stub, requests_factory, max_attempts=20):
4 delay = 0.5
5 attempt = 0
6 while True:
7 try:
8 for resp in stub.ProcessConversation(requests_factory()):
9 yield resp
10 return # clean end
11 except grpc.RpcError as e:
12 attempt += 1
13 if attempt >= max_attempts:
14 raise
15 code = e.code()
16 if code not in (
17 grpc.StatusCode.UNAVAILABLE,
18 grpc.StatusCode.DEADLINE_EXCEEDED,
19 grpc.StatusCode.INTERNAL,
20 grpc.StatusCode.RESOURCE_EXHAUSTED,
21 ):
22 raise
23 time.sleep(min(delay, 30) * (0.5 + random.random() * 0.5))
24 delay = min(delay * 2, 30)

Local development

ast dev runs the messaging sidecar locally so the SDK flow is identical to production. Enable platform adapters per project under dev.interfaces.messaging.adapters in astropods.yml:

1dev:
2 interfaces:
3 messaging:
4 adapters: [web] # or [slack, web]

Open the bundled playground at http://localhost:8080 to drive your agent end-to-end without touching Slack. See the package spec for the full dev.interfaces.messaging schema.

Worked examples

Slack

The Slack adapter forwards five flavours of event:

event_kindSource
EVENT_KIND_DMDirect message to the bot.
EVENT_KIND_APP_MENTION@bot in a channel or thread.
EVENT_KIND_THREAD_REPLYReply inside a thread the bot is already in.
EVENT_KIND_OBSERVEDChannel the bot is observing without being mentioned (listen-only).
EVENT_KIND_REACTION / _BUTTON_CLICK / _SLASH_COMMAND / _ASSISTANT_THREAD_STARTEDInteractive events.

Status updates translate to Slack’s assistant.threads.setStatus. Suggested prompts translate to assistant.threads.setSuggestedPrompts. CardAttachment ships Block Kit JSON straight through.

1import queue, threading, grpc
2from astropods_messaging import (
3 AgentMessagingStub, ConversationRequest, AgentResponse,
4 ContentChunk, StatusUpdate, SuggestedPrompts,
5)
6from astropods_messaging.astro.messaging.v1.message_pb2 import PlatformContext
7
8channel = grpc.insecure_channel("localhost:9090")
9stub = AgentMessagingStub(channel)
10outbound: queue.Queue = queue.Queue()
11
12def send(resp: AgentResponse):
13 outbound.put(ConversationRequest(agent_response=resp))
14
15def handle_slack_message(m):
16 cid = m.conversation_id
17 kind = m.platform_context.event_kind
18
19 if kind == PlatformContext.EVENT_KIND_OBSERVED:
20 return # listen-only
21
22 send(AgentResponse(
23 conversation_id=cid,
24 status=StatusUpdate(status=StatusUpdate.THINKING),
25 ))
26
27 open_thread = kind == PlatformContext.EVENT_KIND_APP_MENTION
28 send(AgentResponse(
29 conversation_id=cid,
30 content=ContentChunk(type=ContentChunk.START, content=""),
31 ))
32
33 for token in stream_llm(m.content):
34 send(AgentResponse(
35 conversation_id=cid,
36 content=ContentChunk(type=ContentChunk.DELTA, content=token),
37 ))
38
39 send(AgentResponse(
40 conversation_id=cid,
41 content=ContentChunk(type=ContentChunk.END, content=""),
42 ))
43
44 send(AgentResponse(conversation_id=cid, prompts=SuggestedPrompts(prompts=[
45 SuggestedPrompts.Prompt(id="p1", title="Show example",
46 message="Show me an example"),
47 SuggestedPrompts.Prompt(id="p2", title="Go deeper",
48 message="Can you explain more?"),
49 ])))
50
51def requests():
52 while True:
53 yield outbound.get()
54
55for resp in stub.ProcessConversation(requests()):
56 payload = resp.WhichOneof("payload")
57 if payload == "incoming_message" and resp.incoming_message.platform == "slack":
58 threading.Thread(target=handle_slack_message,
59 args=(resp.incoming_message,)).start()
60 elif payload == "feedback" and resp.feedback.HasField("reaction"):
61 r = resp.feedback.reaction
62 record_feedback(resp.feedback.response_id, r.type, r.added)

A Block Kit card with an action button:

1import json
2from astropods_messaging.astro.messaging.v1.response_pb2 import (
3 ResponseAttachment, CardAttachment,
4)
5
6block_kit = {
7 "blocks": [
8 {"type": "header", "text": {"type": "plain_text", "text": "Deploy #4837"}},
9 {"type": "section", "fields": [
10 {"type": "mrkdwn", "text": "*Status:*\n:white_check_mark: Green"},
11 {"type": "mrkdwn", "text": "*Region:*\nus-east-1"},
12 ]},
13 {"type": "actions", "elements": [
14 {"type": "button",
15 "text": {"type": "plain_text", "text": "View logs"},
16 "action_id": "view_logs", "value": "deploy-4837"},
17 ]},
18 ],
19}
20
21send(AgentResponse(
22 conversation_id=cid,
23 content=ContentChunk(
24 type=ContentChunk.END,
25 content="Deploy status:",
26 attachments=[ResponseAttachment(
27 card=CardAttachment(platform_card_json=json.dumps(block_kit)),
28 )],
29 ),
30))

When the user clicks View logs, you’ll get a PlatformFeedback.button_click event with button_id="view_logs" and value="deploy-4837".

Web (playground / browser chat)

Every message arrives with platform == "web" and event_kind == EVENT_KIND_DM. The two web-specific concerns are audio input and session-scoped conversations (one conversation_id per browser tab).

1import queue, threading, grpc
2from astropods_messaging import (
3 AgentMessagingStub, ConversationRequest, AgentResponse,
4 ContentChunk, StatusUpdate, Transcript,
5)
6
7channel = grpc.insecure_channel("localhost:9090")
8stub = AgentMessagingStub(channel)
9outbound: queue.Queue = queue.Queue()
10
11audio_buffers: dict[str, bytearray] = {}
12audio_configs: dict[str, object] = {}
13
14def send(resp: AgentResponse):
15 outbound.put(ConversationRequest(agent_response=resp))
16
17def handle_web_message(m):
18 cid = m.conversation_id
19 send(AgentResponse(
20 conversation_id=cid,
21 status=StatusUpdate(status=StatusUpdate.GENERATING),
22 ))
23 send(AgentResponse(
24 conversation_id=cid,
25 content=ContentChunk(type=ContentChunk.START, content=""),
26 ))
27 for token in stream_llm(m.content):
28 send(AgentResponse(
29 conversation_id=cid,
30 content=ContentChunk(type=ContentChunk.DELTA, content=token),
31 ))
32 send(AgentResponse(
33 conversation_id=cid,
34 content=ContentChunk(type=ContentChunk.END, content=""),
35 ))
36
37def requests():
38 while True:
39 yield outbound.get()
40
41for resp in stub.ProcessConversation(requests()):
42 payload = resp.WhichOneof("payload")
43
44 if payload == "incoming_message" and resp.incoming_message.platform == "web":
45 threading.Thread(
46 target=handle_web_message,
47 args=(resp.incoming_message,),
48 ).start()
49
50 elif payload == "audio_config":
51 cfg = resp.audio_config
52 audio_configs[cfg.conversation_id] = cfg
53 audio_buffers[cfg.conversation_id] = bytearray()
54
55 elif payload == "audio_chunk":
56 chunk = resp.audio_chunk
57 # Single-session example — maintain a per-session mapping in real apps
58 cid = next(iter(audio_configs))
59 audio_buffers[cid].extend(chunk.data)
60 if chunk.done:
61 cfg = audio_configs.pop(cid)
62 audio = audio_buffers.pop(cid)
63 text = run_stt(bytes(audio),
64 encoding=cfg.encoding,
65 sample_rate=cfg.sample_rate)
66 send(AgentResponse(
67 conversation_id=cid,
68 transcript=Transcript(text=text),
69 ))
70 handle_web_message(type("M", (), {
71 "conversation_id": cid, "content": text,
72 }))

The bundled playground emits WEBM_OPUS at 48 kHz from MediaRecorder. Firefox emits OGG_OPUS. Branch on cfg.encoding to pick the right STT filetype.

Cross-platform agent

In practice a single agent serves both. The only platform-specific code is whether you open a Slack thread:

1from astropods_messaging.astro.messaging.v1.response_pb2 import MessageOptions
2
3m = resp.incoming_message
4open_thread = (
5 m.platform == "slack"
6 and m.platform_context.event_kind == PlatformContext.EVENT_KIND_APP_MENTION
7)
8send(AgentResponse(
9 conversation_id=m.conversation_id,
10 content=ContentChunk(
11 type=ContentChunk.START,
12 content="",
13 options=MessageOptions(create_thread=True) if open_thread else None,
14 ),
15))
16# …rest of the loop is identical for Slack and web

Exported symbols

1# Service
2AgentMessagingStub
3ConversationRequest, HealthCheckRequest, HealthCheckResponse
4
5# Messages
6Message, User, Attachment
7
8# Responses
9AgentResponse, ContentChunk, StatusUpdate, SuggestedPrompts
10ThreadMetadata, Transcript, ErrorResponse
11
12# Config
13AgentConfig, AgentToolConfig
14
15# Audio
16AudioStreamConfig, AudioChunk, AudioEncoding
17
18# Feedback
19PlatformFeedback, MessageReaction, TextFeedback
20ButtonClick, PromptSelection, StreamControl, MessageEdit, MessageDelete

The full proto source lives in modules/messaging/proto/astro/messaging/v1/service.proto, message.proto, response.proto, feedback.proto, audio.proto, config.proto.