Custom adapter (Python)

Build a custom adapter with astropods-adapter-core

View as Markdown

astropods-adapter-core is the framework-agnostic Python core. Implement the AgentAdapter protocol, hand it to serve(), and the bundled MessagingBridge handles the gRPC streaming loop, audio, feedback, and shutdown.

Use this when there’s no first-party adapter for your framework (or no framework at all — you’re calling LLM APIs directly).

Install

$pip install astropods-adapter-core

Requires Python 3.10+.

Minimal adapter

1from astropods_adapter_core import StreamHooks, StreamOptions, serve
2
3class MyAdapter:
4 name = "My Agent"
5
6 async def stream(self, prompt: str, hooks: StreamHooks, options: StreamOptions) -> None:
7 try:
8 hooks.on_chunk("Hello, ")
9 hooks.on_chunk(f"{prompt}!")
10 hooks.on_finish()
11 except Exception as e:
12 hooks.on_error(e)
13
14 def get_config(self) -> dict:
15 return {"system_prompt": "You are a helpful assistant.", "tools": []}
16
17serve(MyAdapter())

serve(adapter) connects to localhost:9090 (or GRPC_SERVER_ADDR) and blocks until SIGINT / SIGTERM.

Lifecycle

AgentAdapter protocol

The interface is a typing.Protocol — duck-typed. Any class with these members satisfies it.

MemberRequiredNotes
name: stryesDisplay name. Used in logs and AgentConfig.
async stream(prompt, hooks, options)yesStream a reply. Call hooks.on_chunk / on_status_update / etc. as you go.
get_config() -> dictyesReturns {"system_prompt": str, "tools": [...]} for the playground.
async stream_audio(audio, hooks, options)noHandle audio messages. Omit to reject audio input with a friendly message.
on_feedback(feedback)noReceive thumbs-up/down, text feedback, button clicks, etc. May be sync or async.

StreamHooks

Call these from inside stream() and stream_audio(). They translate directly into outbound AgentResponse messages on the gRPC stream.

MethodSendsWhen to call
on_chunk(text)ContentChunk (START first, then DELTAs)Each text fragment from the LLM.
on_status_update(status)StatusUpdatePre-content typing indicator, tool execution status.
on_transcript(text)TranscriptAfter STT — replaces the “[audio]” placeholder.
on_audio_chunk(data)AudioChunkTTS bytes back to the platform.
on_audio_end()AudioChunk(done=True)End-of-segment marker after TTS.
on_error(exception)ErrorResponseGeneration failed. Do not also call on_finish().
on_finish()ContentChunk ENDResponse complete. Call exactly once per request.

on_status_update takes a dict with a "status" key. Valid values: "THINKING", "SEARCHING", "GENERATING", "PROCESSING", "ANALYZING", "CUSTOM". For CUSTOM, include "custom_message":

1hooks.on_status_update({"status": "CUSTOM", "custom_message": "Fetching data..."})

StreamOptions

The bridge passes this to every stream() / stream_audio() call.

FieldTypeNotes
conversation_idstrStable across the conversation. Use as the memory/session key.
user_idstrSender’s user ID. Use for memory scoping and trace attribution.
platform_contextOptional[PlatformContext]Channel/thread IDs, workspace, event_kind, raw platform user_id. None when the message did not originate from a platform adapter. See Messaging SDK — PlatformContext.

AudioInput

Passed to stream_audio().

FieldTypeNotes
databytesRaw audio bytes accumulated from the segment.
configprotoAudioStreamConfig — encoding, sample_rate, channels, language, source.

VoiceProvider

Optional protocol for STT. Pass an instance to your adapter, then call its listen() method in stream_audio().

MethodNotes
async listen(data: bytes, config) -> strTranscribe raw audio bytes to text.

FeedbackEvent

Passed to on_feedback(). kind is a stable string discriminator — no proto imports needed.

FieldTypeNotes
conversation_idstrConversation the feedback is attached to.
response_idstrPlatform message ID the feedback targets.
kindstrSee table below.
user_idstrSubmitter’s user ID. "" for anonymous/system events.
user_namestrSubmitter’s display name.
textOptional[str]Populated for "text" (the body), "reaction" (emoji name).
promptOptional[str]Populated for "text" (the modal label).

FeedbackEvent.kind values

ValueSource
"thumbs_up"MessageReaction with THUMBS_UP. Synthesized from the reaction enum.
"thumbs_down"MessageReaction with THUMBS_DOWN.
"reaction"MessageReaction with CUSTOM_EMOJI. text holds the emoji name.
"text"TextFeedback modal submission. text is the body; prompt is the modal label.
"button_click"ButtonClick from a CardAttachment.
"prompt_selection"User clicked a SuggestedPrompts entry.
"stream_control"StreamControl (stop/pause/resume/regenerate).
"message_edit"User edited their own previous message.
"message_delete"User deleted their own previous message.

on_feedback may be sync or async — the bridge probes with hasattr and dispatches. It does not await the result, so don’t block on slow I/O. Push to a queue or trigger an async write and return.

serve(adapter, options?)

Thin wrapper that handles process lifecycle. Sets up JSON-formatted logging, runs asyncio.run(bridge.start()), and exits on SIGINT / SIGTERM.

ParameterTypeRequiredNotes
adapterAgentAdapteryesYour implementation.
optionsServeOptionsnoServeOptions(server_address=...).

ServeOptions.server_address defaults to os.environ.get("GRPC_SERVER_ADDR", "localhost:9090").

Worked example: a plain Anthropic agent

No framework — just calling Anthropic’s streaming messages API.

1import os
2from anthropic import AsyncAnthropic
3from astropods_adapter_core import StreamHooks, StreamOptions, serve
4
5client = AsyncAnthropic(
6 api_key=os.environ["ASTRO_GATEWAY_API_KEY"],
7 base_url=os.environ["ASTRO_GATEWAY_URL"],
8)
9
10class AnthropicDirect:
11 name = "Anthropic Direct"
12
13 async def stream(self, prompt: str, hooks: StreamHooks, options: StreamOptions) -> None:
14 try:
15 hooks.on_status_update({"status": "GENERATING"})
16
17 async with client.messages.stream(
18 model="claude-sonnet-4-6",
19 max_tokens=1024,
20 system="You are a helpful assistant.",
21 messages=[{"role": "user", "content": prompt}],
22 ) as stream:
23 async for text in stream.text_stream:
24 hooks.on_chunk(text)
25
26 hooks.on_finish()
27 except Exception as e:
28 hooks.on_error(e)
29
30 def get_config(self) -> dict:
31 return {"system_prompt": "You are a helpful assistant.", "tools": []}
32
33 def on_feedback(self, event) -> None:
34 # Push to your evals pipeline, Airtable, etc. Don't block.
35 record_feedback_async(event)
36
37serve(AnthropicDirect())

Worked example: audio with STT

1from astropods_adapter_core import AudioInput, StreamHooks, StreamOptions, serve
2
3class MyAdapter:
4 name = "My Agent"
5
6 def __init__(self, voice):
7 self._voice = voice # any object with: async listen(bytes, config) -> str
8
9 async def stream(self, prompt, hooks, options):
10 # ... text handling as usual
11
12 async def stream_audio(self, audio: AudioInput, hooks: StreamHooks, options: StreamOptions) -> None:
13 try:
14 hooks.on_status_update({"status": "PROCESSING", "custom_message": "Transcribing audio"})
15 transcript = await self._voice.listen(audio.data, audio.config)
16 hooks.on_transcript(transcript)
17
18 # Now run the agent on the transcript as if it were a text message
19 await self.stream(transcript, hooks, options)
20 except Exception as e:
21 hooks.on_error(e)
22
23 def get_config(self):
24 return {"system_prompt": "...", "tools": []}

Rules of thumb

  • Call exactly one of on_finish() or on_error() per request. Skipping either leaves the user staring at a half-rendered reply.
  • Catch your own exceptions inside stream(). If you let the coroutine raise, the user sees nothing.
  • Don’t block in on_feedback() — push to a queue or trigger async work and return.
  • Use options.conversation_id (not platform_context.thread_id) as your memory key.
  • Configure OTEL manually before calling serve() if you want traces. Framework adapters auto-configure this; the raw core does not.

Exported symbols

1from astropods_adapter_core import (
2 # Protocols / interfaces
3 AgentAdapter,
4 StreamHooks,
5 VoiceProvider,
6 # Dataclasses
7 StreamOptions,
8 AudioInput,
9 FeedbackEvent,
10 ServeOptions,
11 # Re-exported from astropods_messaging
12 PlatformContext,
13 # Entry points
14 serve,
15 MessagingBridge,
16)

See Messaging SDK (Python) for the underlying proto shapes referenced by StreamHooks and PlatformContext.