> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.astropods.com/llms.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.astropods.com/_mcp/server.

# Custom adapter (Python)

[`astropods-adapter-core`](https://pypi.org/project/astropods-adapter-core/) is the framework-agnostic Python core. Implement the `AgentAdapter` protocol, hand it to `serve()`, and the bundled `MessagingBridge` handles the gRPC streaming loop, audio, feedback, and shutdown.

Use this when there's no first-party adapter for your framework (or no framework at all — you're calling LLM APIs directly).

## Install

```bash
pip install astropods-adapter-core
```

Requires Python 3.10+.

## Minimal adapter

```python
from astropods_adapter_core import StreamHooks, StreamOptions, serve

class MyAdapter:
    name = "My Agent"

    async def stream(self, prompt: str, hooks: StreamHooks, options: StreamOptions) -> None:
        try:
            hooks.on_chunk("Hello, ")
            hooks.on_chunk(f"{prompt}!")
            hooks.on_finish()
        except Exception as e:
            hooks.on_error(e)

    def get_config(self) -> dict:
        return {"system_prompt": "You are a helpful assistant.", "tools": []}

serve(MyAdapter())
```

`serve(adapter)` connects to `localhost:9090` (or `GRPC_SERVER_ADDR`) and blocks until `SIGINT` / `SIGTERM`.

## Lifecycle

```mermaid
sequenceDiagram
    participant Side as Messaging sidecar
    participant Bridge as MessagingBridge
    participant Adapter as Your AgentAdapter

    Bridge->>Side: connect (with retry/backoff)
    Bridge->>Side: send AgentConfig from get_config()
    loop For each inbound message
        Side-->>Bridge: AgentResponse.incoming_message
        Bridge->>Adapter: await stream(prompt, hooks, options)
        Adapter->>Bridge: hooks.on_chunk("...")
        Bridge->>Side: ContentChunk DELTA
        Adapter->>Bridge: hooks.on_status_update({status:'PROCESSING'})
        Bridge->>Side: StatusUpdate
        Adapter->>Bridge: hooks.on_finish()
        Bridge->>Side: ContentChunk END
    end
    Side-->>Bridge: AgentResponse.feedback
    Bridge->>Adapter: on_feedback(event)
```

## `AgentAdapter` protocol

The interface is a `typing.Protocol` — duck-typed. Any class with these members satisfies it.

| Member                                      | Required | Notes                                                                            |
| ------------------------------------------- | -------- | -------------------------------------------------------------------------------- |
| `name: str`                                 | yes      | Display name. Used in logs and `AgentConfig`.                                    |
| `async stream(prompt, hooks, options)`      | yes      | Stream a reply. Call `hooks.on_chunk` / `on_status_update` / etc. as you go.     |
| `get_config() -> dict`                      | yes      | Returns `{"system_prompt": str, "tools": [...]}` for the playground.             |
| `async stream_audio(audio, hooks, options)` | no       | Handle audio messages. Omit to reject audio input with a friendly message.       |
| `on_feedback(feedback)`                     | no       | Receive thumbs-up/down, text feedback, button clicks, etc. May be sync or async. |

### `StreamHooks`

Call these from inside `stream()` and `stream_audio()`. They translate directly into outbound `AgentResponse` messages on the gRPC stream.

| Method                     | Sends                                         | When to call                                           |
| -------------------------- | --------------------------------------------- | ------------------------------------------------------ |
| `on_chunk(text)`           | `ContentChunk` (`START` first, then `DELTA`s) | Each text fragment from the LLM.                       |
| `on_status_update(status)` | `StatusUpdate`                                | Pre-content typing indicator, tool execution status.   |
| `on_transcript(text)`      | `Transcript`                                  | After STT — replaces the "\[audio]" placeholder.       |
| `on_audio_chunk(data)`     | `AudioChunk`                                  | TTS bytes back to the platform.                        |
| `on_audio_end()`           | `AudioChunk(done=True)`                       | End-of-segment marker after TTS.                       |
| `on_error(exception)`      | `ErrorResponse`                               | Generation failed. **Do not also call `on_finish()`.** |
| `on_finish()`              | `ContentChunk END`                            | Response complete. **Call exactly once per request.**  |

`on_status_update` takes a `dict` with a `"status"` key. Valid values: `"THINKING"`, `"SEARCHING"`, `"GENERATING"`, `"PROCESSING"`, `"ANALYZING"`, `"CUSTOM"`. For `CUSTOM`, include `"custom_message"`:

```python
hooks.on_status_update({"status": "CUSTOM", "custom_message": "Fetching data..."})
```

### `StreamOptions`

The bridge passes this to every `stream()` / `stream_audio()` call.

| Field              | Type                        | Notes                                                                                                                                                                                                                         |
| ------------------ | --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `conversation_id`  | `str`                       | Stable across the conversation. Use as the memory/session key.                                                                                                                                                                |
| `user_id`          | `str`                       | Sender's user ID. Use for memory scoping and trace attribution.                                                                                                                                                               |
| `platform_context` | `Optional[PlatformContext]` | Channel/thread IDs, workspace, event\_kind, raw platform user\_id. `None` when the message did not originate from a platform adapter. See [Messaging SDK — PlatformContext](../messaging-sdk/python#inbound-message-anatomy). |

### `AudioInput`

Passed to `stream_audio()`.

| Field    | Type    | Notes                                                                     |
| -------- | ------- | ------------------------------------------------------------------------- |
| `data`   | `bytes` | Raw audio bytes accumulated from the segment.                             |
| `config` | proto   | `AudioStreamConfig` — encoding, sample\_rate, channels, language, source. |

### `VoiceProvider`

Optional protocol for STT. Pass an instance to your adapter, then call its `listen()` method in `stream_audio()`.

| Method                                     | Notes                               |
| ------------------------------------------ | ----------------------------------- |
| `async listen(data: bytes, config) -> str` | Transcribe raw audio bytes to text. |

### `FeedbackEvent`

Passed to `on_feedback()`. `kind` is a stable string discriminator — no proto imports needed.

| Field             | Type            | Notes                                                         |
| ----------------- | --------------- | ------------------------------------------------------------- |
| `conversation_id` | `str`           | Conversation the feedback is attached to.                     |
| `response_id`     | `str`           | Platform message ID the feedback targets.                     |
| `kind`            | `str`           | See table below.                                              |
| `user_id`         | `str`           | Submitter's user ID. `""` for anonymous/system events.        |
| `user_name`       | `str`           | Submitter's display name.                                     |
| `text`            | `Optional[str]` | Populated for `"text"` (the body), `"reaction"` (emoji name). |
| `prompt`          | `Optional[str]` | Populated for `"text"` (the modal label).                     |

**`FeedbackEvent.kind` values**

| Value                | Source                                                                            |
| -------------------- | --------------------------------------------------------------------------------- |
| `"thumbs_up"`        | `MessageReaction` with `THUMBS_UP`. Synthesized from the reaction enum.           |
| `"thumbs_down"`      | `MessageReaction` with `THUMBS_DOWN`.                                             |
| `"reaction"`         | `MessageReaction` with `CUSTOM_EMOJI`. `text` holds the emoji name.               |
| `"text"`             | `TextFeedback` modal submission. `text` is the body; `prompt` is the modal label. |
| `"button_click"`     | `ButtonClick` from a `CardAttachment`.                                            |
| `"prompt_selection"` | User clicked a `SuggestedPrompts` entry.                                          |
| `"stream_control"`   | `StreamControl` (stop/pause/resume/regenerate).                                   |
| `"message_edit"`     | User edited their own previous message.                                           |
| `"message_delete"`   | User deleted their own previous message.                                          |

`on_feedback` may be sync or async — the bridge probes with `hasattr` and dispatches. It does **not** await the result, so don't block on slow I/O. Push to a queue or trigger an async write and return.

## `serve(adapter, options?)`

Thin wrapper that handles process lifecycle. Sets up JSON-formatted logging, runs `asyncio.run(bridge.start())`, and exits on `SIGINT` / `SIGTERM`.

| Parameter | Type           | Required | Notes                               |
| --------- | -------------- | -------- | ----------------------------------- |
| `adapter` | `AgentAdapter` | yes      | Your implementation.                |
| `options` | `ServeOptions` | no       | `ServeOptions(server_address=...)`. |

`ServeOptions.server_address` defaults to `os.environ.get("GRPC_SERVER_ADDR", "localhost:9090")`.

## Worked example: a plain Anthropic agent

No framework — just calling Anthropic's streaming messages API.

```python
import os
from anthropic import AsyncAnthropic
from astropods_adapter_core import StreamHooks, StreamOptions, serve

client = AsyncAnthropic(
    api_key=os.environ["ASTRO_GATEWAY_API_KEY"],
    base_url=os.environ["ASTRO_GATEWAY_URL"],
)

class AnthropicDirect:
    name = "Anthropic Direct"

    async def stream(self, prompt: str, hooks: StreamHooks, options: StreamOptions) -> None:
        try:
            hooks.on_status_update({"status": "GENERATING"})

            async with client.messages.stream(
                model="claude-sonnet-4-6",
                max_tokens=1024,
                system="You are a helpful assistant.",
                messages=[{"role": "user", "content": prompt}],
            ) as stream:
                async for text in stream.text_stream:
                    hooks.on_chunk(text)

            hooks.on_finish()
        except Exception as e:
            hooks.on_error(e)

    def get_config(self) -> dict:
        return {"system_prompt": "You are a helpful assistant.", "tools": []}

    def on_feedback(self, event) -> None:
        # Push to your evals pipeline, Airtable, etc. Don't block.
        record_feedback_async(event)

serve(AnthropicDirect())
```

## Worked example: audio with STT

```python
from astropods_adapter_core import AudioInput, StreamHooks, StreamOptions, serve

class MyAdapter:
    name = "My Agent"

    def __init__(self, voice):
        self._voice = voice  # any object with: async listen(bytes, config) -> str

    async def stream(self, prompt, hooks, options):
        # ... text handling as usual

    async def stream_audio(self, audio: AudioInput, hooks: StreamHooks, options: StreamOptions) -> None:
        try:
            hooks.on_status_update({"status": "PROCESSING", "custom_message": "Transcribing audio"})
            transcript = await self._voice.listen(audio.data, audio.config)
            hooks.on_transcript(transcript)

            # Now run the agent on the transcript as if it were a text message
            await self.stream(transcript, hooks, options)
        except Exception as e:
            hooks.on_error(e)

    def get_config(self):
        return {"system_prompt": "...", "tools": []}
```

## Rules of thumb

* Call **exactly one** of `on_finish()` or `on_error()` per request. Skipping either leaves the user staring at a half-rendered reply.
* Catch your own exceptions inside `stream()`. If you let the coroutine raise, the user sees nothing.
* Don't block in `on_feedback()` — push to a queue or trigger async work and return.
* Use `options.conversation_id` (not `platform_context.thread_id`) as your memory key.
* Configure OTEL manually before calling `serve()` if you want traces. Framework adapters auto-configure this; the raw core does not.

## Exported symbols

```python
from astropods_adapter_core import (
    # Protocols / interfaces
    AgentAdapter,
    StreamHooks,
    VoiceProvider,
    # Dataclasses
    StreamOptions,
    AudioInput,
    FeedbackEvent,
    ServeOptions,
    # Re-exported from astropods_messaging
    PlatformContext,
    # Entry points
    serve,
    MessagingBridge,
)
```

See [Messaging SDK (Python)](../messaging-sdk/python) for the underlying proto shapes referenced by `StreamHooks` and `PlatformContext`.