Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/data/languages/languageData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default {
},
aiTransport: {
javascript: '2.21',
react: '2.21',
java: '1.6',
python: '3.1',
swift: '1.2',
Expand Down
190 changes: 190 additions & 0 deletions src/pages/docs/ai-transport/token-streaming/message-per-response.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,22 @@ For more details on rate limits and rollup behavior, see [Token streaming limits

## Subscribing to token streams <a id="subscribing"/>

<If lang="react">
<Aside data-type='note'>
These examples use the `useChannel` hook from `ably/react`. Your component must be wrapped in an [`AblyProvider`](/docs/getting-started/react#prerequisites-setup-ably-provider) and a [`ChannelProvider`](/docs/getting-started/react#step-2-channel-provider) that specifies the channel name and any channel options. For example, to subscribe with rewind enabled on an `ai:` namespaced channel:

<Code>
```react
<AblyProvider client={realtimeClient}>
<ChannelProvider channelName="ai:my-channel" options={{ params: { rewind: '2m' } }}>
<YourComponent />
</ChannelProvider>
</AblyProvider>
```
</Code>
</Aside>
</If>

Subscribers receive different message actions depending on when they join and how they're retrieving messages. Each message has an `action` field that indicates how to process it, and a `serial` field that identifies which message the action relates to:

- `message.create`: Indicates a new response has started (i.e. a new message was created). The message `data` contains the initial content (often empty or the first token). Store this as the beginning of a new response using `serial` as the identifier.
Expand Down Expand Up @@ -449,6 +465,31 @@ channel.subscribe(message -> {
}
});
```
```react
const [responses, setResponses] = useState(new Map());

// Subscribe to live messages
useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
// New response started
next.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
next.set(message.serial, (next.get(message.serial) || '') + message.data);
break;
case 'message.update':
// Replace entire response content
next.set(message.serial, message.data);
break;
}
return next;
});
});
```
</Code>

## Client hydration <a id="hydration"/>
Expand Down Expand Up @@ -549,6 +590,31 @@ channel.subscribe(message -> {
}
});
```
```react
// Ensure the outer ChannelProvider has options={{ params: { rewind: '2m' } }}

const [responses, setResponses] = useState(new Map());

// Receive both recent historical (via rewind) and live messages
useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(message.serial, message.data);
break;
case 'message.append':
const current = next.get(message.serial) || '';
next.set(message.serial, current + message.data);
break;
case 'message.update':
next.set(message.serial, message.data);
break;
}
return next;
});
});
```
</Code>

Rewind supports two formats:
Expand Down Expand Up @@ -678,6 +744,46 @@ while (page != null) {
page = page.hasNext() ? page.next() : null;
}
```
```react
const [responses, setResponses] = useState(new Map());
const hydrated = useRef(false);

// Subscribe to live messages and get the history function
const { history } = useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(message.serial, message.data);
break;
case 'message.append':
next.set(message.serial, (next.get(message.serial) || '') + message.data);
break;
case 'message.update':
next.set(message.serial, message.data);
break;
}
return next;
});
});

// Fetch history on mount
useEffect(() => {
if (hydrated.current) return;
hydrated.current = true;

(async () => {
let page = await history({ untilAttach: true });
while (page) {
for (const message of page.items) {
// message.data contains the full concatenated text
setResponses((prev) => new Map(prev).set(message.serial, message.data));
}
page = page.hasNext() ? await page.next() : null;
}
})();
}, [history]);
```
</Code>

### Hydrating an in-progress response <a id="in-progress-response"/>
Expand Down Expand Up @@ -911,6 +1017,40 @@ channel.subscribe(message -> {
}
});
```
```react
// Ensure the outer ChannelProvider has options={{ params: { rewind: '2m' } }}

// Load completed responses from your database (Set of responseIds)
const completedResponses = useCompletedResponses();

const [inProgressResponses, setInProgressResponses] = useState(new Map());

// Receive both recent historical and live messages
useChannel('ai:responses', (message) => {
const responseId = message.extras?.headers?.responseId;

if (!responseId) return;

// Skip messages for responses already loaded from database
if (completedResponses.has(responseId)) return;

setInProgressResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(responseId, message.data);
break;
case 'message.append':
next.set(responseId, (next.get(responseId) || '') + message.data);
break;
case 'message.update':
next.set(responseId, message.data);
break;
}
return next;
});
});
```
</Code>

<Aside data-type="note">
Expand Down Expand Up @@ -1118,6 +1258,56 @@ while (page != null) {
page = page.hasNext() ? page.next() : null;
}
```
```react
// Load completed responses and latest timestamp from your database
const { completedResponses, latestTimestamp } = useCompletedResponses();

const [inProgressResponses, setInProgressResponses] = useState(new Map());
const hydrated = useRef(false);

// Subscribe to live messages and get the history function
const { history } = useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
const responseId = message.extras?.headers?.responseId;

if (!responseId) return;
if (completedResponses.has(responseId)) return;

setInProgressResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(responseId, message.data);
break;
case 'message.append':
next.set(responseId, (next.get(responseId) || '') + message.data);
break;
case 'message.update':
next.set(responseId, message.data);
break;
}
return next;
});
});

// Fetch history from the last completed response until attachment
useEffect(() => {
if (hydrated.current) return;
hydrated.current = true;

(async () => {
let page = await history({ untilAttach: true, start: latestTimestamp });
while (page) {
for (const message of page.items) {
const responseId = message.extras?.headers?.responseId;
if (responseId) {
setInProgressResponses((prev) => new Map(prev).set(responseId, message.data));
}
}
page = page.hasNext() ? await page.next() : null;
}
})();
}, [history]);
```
</Code>

<Aside data-type="note">
Expand Down
Loading