Skip to main content

Streaming Guide

Deep Research streams real-time progress via Server-Sent Events (SSE). This guide covers all event types, reconnection handling, and code examples.

Overview

When you create a research task, the response includes a streamUrl. Connect to this URL to receive events as the multi-agent system searches the web, extracts content, and synthesizes findings.

SSE is the primary way to consume Deep Research results. It provides immediate visibility into progress without polling, and supports reconnection if your connection drops.

Connecting to a Stream

Endpoint:

GET /research/tasks/{taskId}/stream

Query Parameters:

ParameterTypeDefaultDescription
detailstringTask's detail valuebasic or detailed
fromSequenceinteger0Resume from this sequence number

curl:

curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"

JavaScript (EventSource):

const es = new EventSource(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);

Python (sseclient):

import requests
import sseclient

response = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
stream=True,
)
client = sseclient.SSEClient(response)

Detail Levels

basic (default)

High-level progress events. Best for end-user UIs where you want to show progress bars, source lists, and the final report.

Events: topic, progress, source, supervisor_thinking, result, error, done

detailed

Everything in basic plus agent-level observability events. Best for debugging, developer tools, or building advanced visualizations of the agent system.

Additional events: agent, tool, text, thinking

Override the detail level per connection:

curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"

Event Reference

Basic Events

These events are sent at both basic and detailed detail levels.

topic

A sub-topic research task has started or completed.

{
"topic": "solid-state-battery-materials",
"index": 1,
"status": "started"
}
FieldTypeDescription
topicstringSub-topic name or agent identifier
indexnumberTopic number (1-based)
statusstringstarted or completed

progress

Overall research progress update. Sent when a topic completes or a sub-report is saved.

{
"topics_total": 3,
"topics_completed": 1,
"sources_found": 5
}
FieldTypeDescription
topics_totalnumberTotal number of sub-topics being researched
topics_completednumberNumber of sub-topics completed so far
sources_foundnumberTotal web sources discovered so far

source

A web source was crawled and saved.

{
"url": "https://example.com/solid-state-research",
"title": "Solid-State Battery Breakthroughs in 2025",
"score": 0.85,
"topic": "solid-state-battery-materials",
"source_type": { "type": "web" }
}
FieldTypeDescription
urlstringURL of the web source
titlestringPage title
scorenumberRelevance score (0-1)
topicstringSub-topic this source relates to
source_typeobjectSource type object (e.g., { "type": "web" })

supervisor_thinking

The supervisor agent's reasoning as it plans and coordinates research. This event is sent as the supervisor decomposes the topic, adjusts its strategy, or reflects on sub-researcher progress.

{
"content": "I'll decompose this topic into three areas: materials science, EV integration, and manufacturing challenges."
}
FieldTypeDescription
contentstringSupervisor reasoning or thought

result

Research is complete. Contains metadata about the completed report. The report text itself is not included — use GET /research/tasks/:id with the report_id to fetch the full report.

{
"report_id": "rpt-abc123",
"topics_researched_count": 3,
"confidence_level": "high",
"total_citations": 12
}
FieldTypeDescription
report_idstringID of the generated report. Fetch the full report via GET /research/tasks/:id
topics_researched_countnumberNumber of sub-topics that were researched
confidence_levelstringhigh, medium, or low
total_citationsnumberTotal number of citations in the report

error

Research encountered an error.

{
"message": "Workflow execution timed out"
}
FieldTypeDescription
messagestringError description

done

Stream has ended. This is always the last event sent. Close your connection after receiving it. Includes a status field so consumers can distinguish successful completion from other endings.

{
"status": "completed"
}
FieldTypeDescription
statusstringFinal task status: completed, failed, or ended (unknown/default)

Detailed Events

These events are only sent when detail=detailed.

agent

Agent lifecycle event. Tracks when sub-researcher agents start and end.

Agent started:

{
"type": "start",
"id": "sub-researcher-abc123",
"topic": "solid-state-battery-materials"
}

Agent ended:

{
"type": "end",
"id": "sub-researcher-abc123",
"status": "completed"
}
FieldTypeDescription
typestringstart or end
idstringUnique agent identifier
topicstringSub-topic assigned (on start)
statusstringAgent exit status (on end)

tool

Tool execution event. Tracks when agents use tools (web search, content extraction, etc.).

Tool started:

{
"type": "start",
"name": "web_search",
"agent_id": "sub-researcher-abc123"
}

Tool ended:

{
"type": "end",
"name": "web_search",
"agent_id": "sub-researcher-abc123",
"results_count": 8
}
FieldTypeDescription
typestringstart or end
namestringTool name (e.g., web_search)
agent_idstringAgent that invoked the tool
results_countnumberNumber of results returned (on end)

text

LLM text generation delta from an agent.

{
"agent_id": "sub-researcher-abc123",
"delta": "Based on the sources analyzed, solid-state batteries "
}
FieldTypeDescription
agent_idstringAgent generating text
deltastringText chunk (incremental)

thinking

Agent reasoning/thinking content.

{
"agent_id": "sub-researcher-abc123",
"content": "I should search for recent papers on sulfide-based solid electrolytes..."
}
FieldTypeDescription
agent_idstringAgent that is reasoning
contentstringThinking/reasoning content

Stream Resumption

Every SSE event the server sends includes an id: line containing a sequence number. This is your checkpoint — track it on the client as events arrive:

event: topic
data: {"topic":"battery-materials","index":1,"status":"started"}
id: 3

event: source
data: {"url":"https://...","title":"...","score":0.85,"topic":"battery-materials","source_type":{"type":"web"}}
id: 7

If your connection drops, reconnect with the last id you received as the fromSequence query parameter:

GET /research/tasks/{taskId}/stream?fromSequence=7

The server replays all events after that sequence, so you won't miss anything. For a fresh connection, fromSequence defaults to 0 (start from the beginning).

In JavaScript, the sequence number is available via event.lastEventId:

let lastSequence = 0;

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
// handle event...
});

Keepalive: The server sends SSE comment lines (: prefix) every 15 seconds to keep the connection alive and prevent proxies or load balancers from timing out.

Stream availability: If you connect immediately after creating a task, the stream may not exist yet. The server polls for up to 30 seconds, sending keepalive comments while waiting. If the stream still isn't available, an error event is sent.

Snapshot + Stream Pattern

For long-running research tasks (5-10 minutes with hundreds of events), replaying the entire stream from fromSequence=0 on reconnection can be slow. The snapshot endpoint lets you load the accumulated state instantly, then connect to the SSE stream for only new events.

Step 1: Fetch the snapshot to hydrate your UI:

GET /research/tasks/{taskId}/snapshot

Step 2: Connect to the SSE stream starting from the snapshot's sequence:

GET /research/tasks/{taskId}/stream?fromSequence={streamSequence}

Any events that occurred between loading the snapshot and connecting to the stream will be replayed automatically — the sequence-last pattern guarantees no events are missed.

JavaScript:

async function reconnect(taskId) {
// 1. Load snapshot for instant UI hydration
const snapshotRes = await fetch(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/snapshot`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);
const { data: snapshot } = await snapshotRes.json();

// Hydrate UI from snapshot
updateProgress(snapshot.progress);
renderTopics(snapshot.topics);
renderSources(snapshot.sources);
renderSupervisorThinking(snapshot.supervisor_thinking);
if (snapshot.result) renderResult(snapshot.result);

// 2. Connect to stream for new events only
let lastSequence = snapshot.streamSequence;
const url = `https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: 'Bearer YOUR_API_KEY' },
});

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
updateProgress(JSON.parse(e.data));
});

// ... handle other events
}

Python:

import requests
import sseclient
import json

def reconnect(task_id: str):
headers = {'Authorization': 'Bearer YOUR_API_KEY'}

# 1. Load snapshot
snapshot_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/snapshot',
headers=headers,
)
snapshot = snapshot_res.json()['data']

# Hydrate state from snapshot
print(f"Progress: {snapshot['progress']['topics_completed']}/{snapshot['progress']['topics_total']}")
for source in snapshot['sources']:
print(f" Source: {source['title']}")
for thinking in snapshot['supervisor_thinking']:
print(f" Supervisor: {thinking['content']}")

# 2. Stream only new events
stream_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
params={'fromSequence': snapshot['streamSequence']},
headers=headers,
stream=True,
)
client = sseclient.SSEClient(stream_res)

for event in client.events():
data = json.loads(event.data)
if event.event == 'done':
break
# ... handle events

When to use this pattern:

  • Page refreshes or tab switches during a running research task
  • Reconnecting after a long disconnection
  • Late-joining clients that need the current state immediately
  • Any scenario where replaying the full event stream would be too slow

Code Examples

JavaScript / Node.js

Complete example with event handling and automatic reconnection:

const API_KEY = 'YOUR_API_KEY';
const BASE_URL = 'https://api.feeds.onhelix.ai';

async function research(topic) {
// 1. Create the task
const createRes = await fetch(`${BASE_URL}/research/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ input: topic }),
});

const { data: task } = await createRes.json();
console.log(`Task created: ${task.id}`);

// 2. Stream results
let lastSequence = 0;
const sources = [];

function connect() {
const url = `${BASE_URL}/research/tasks/${task.id}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: `Bearer ${API_KEY}` },
});

es.addEventListener('supervisor_thinking', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { content } = JSON.parse(e.data);
console.log(`Supervisor: ${content}`);
});

es.addEventListener('topic', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topic, status } = JSON.parse(e.data);
console.log(`Topic "${topic}": ${status}`);
});

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topics_completed, topics_total, sources_found } = JSON.parse(
e.data
);
console.log(
`Progress: ${topics_completed}/${topics_total} topics, ${sources_found} sources`
);
});

es.addEventListener('source', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const source = JSON.parse(e.data);
sources.push(source);
console.log(`Source found: ${source.title} (${source.url})`);
});

es.addEventListener('result', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const result = JSON.parse(e.data);
console.log('\n--- Research Complete ---');
console.log(`Report ID: ${result.report_id}`);
console.log(`Topics researched: ${result.topics_researched_count}`);
console.log(`Confidence: ${result.confidence_level}`);
console.log(`Citations: ${result.total_citations}`);
// Fetch the full report via GET /research/tasks/:id
});

es.addEventListener('error', (e) => {
if (e.data) {
const { message } = JSON.parse(e.data);
console.error(`Research error: ${message}`);
}
es.close();
});

es.addEventListener('done', () => {
es.close();
});

// Auto-reconnect on connection loss
es.onerror = () => {
es.close();
console.log('Connection lost, reconnecting...');
setTimeout(connect, 3000);
};
}

connect();
}

research('Impact of quantum computing on cryptography');

Python

Complete example with reconnection logic:

import requests
import sseclient
import json
import time

API_KEY = 'YOUR_API_KEY'
BASE_URL = 'https://api.feeds.onhelix.ai'
HEADERS = {'Authorization': f'Bearer {API_KEY}'}


def research(topic: str) -> dict:
# 1. Create the task
response = requests.post(
f'{BASE_URL}/research/tasks',
json={'input': topic},
headers={**HEADERS, 'Content-Type': 'application/json'},
)
response.raise_for_status()
task = response.json()['data']
print(f"Task created: {task['id']}")

# 2. Stream results with reconnection
last_sequence = 0
result = None
max_retries = 5

for attempt in range(max_retries):
try:
stream_response = requests.get(
f"{BASE_URL}/research/tasks/{task['id']}/stream",
params={'fromSequence': last_sequence},
headers=HEADERS,
stream=True,
)
stream_response.raise_for_status()
client = sseclient.SSEClient(stream_response)

for event in client.events():
if event.id:
last_sequence = int(event.id)

data = json.loads(event.data)

if event.event == 'supervisor_thinking':
print(f"Supervisor: {data['content']}")

elif event.event == 'topic':
print(f"Topic \"{data['topic']}\": {data['status']}")

elif event.event == 'progress':
print(
f"Progress: {data['topics_completed']}/{data['topics_total']} topics, "
f"{data['sources_found']} sources"
)

elif event.event == 'source':
print(f"Source: {data['title']} ({data['url']})")

elif event.event == 'result':
result = data
print(f"\nResearch complete! Report ID: {data['report_id']}")
print(f"Topics: {data['topics_researched_count']}, Citations: {data['total_citations']}")
# Fetch full report via GET /research/tasks/:id

elif event.event == 'error':
print(f"Error: {data['message']}")
return data

elif event.event == 'done':
return result

break # Stream ended normally

except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
print(f"Connection lost, reconnecting (attempt {attempt + 1})...")
time.sleep(3)

return result


result = research('Impact of quantum computing on cryptography')

curl

# Basic streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"

# Detailed streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"

# Resume from sequence 42
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?fromSequence=42" \
-H "Authorization: Bearer YOUR_API_KEY"

Note: With curl, SSE events are printed as raw text. Each event has an event: line followed by a data: line.

Common Patterns

Progress Bar

Use progress events to build a progress bar:

es.addEventListener('progress', (e) => {
const { topics_completed, topics_total } = JSON.parse(e.data);
const percent = Math.round((topics_completed / topics_total) * 100);
updateProgressBar(percent);
});

Source List Accumulation

Collect sources as they're discovered:

const sources = [];

es.addEventListener('source', (e) => {
const source = JSON.parse(e.data);
sources.push(source);
renderSourceList(sources);
});

Supervisor Thinking Display

Show the supervisor agent's reasoning as it plans research:

const thinkingLog = [];

es.addEventListener('supervisor_thinking', (e) => {
const { content } = JSON.parse(e.data);
thinkingLog.push(content);
renderThinkingPanel(thinkingLog);
});

Agent Visualization (Detailed Mode)

Track active agents and their tool usage:

const agents = new Map();

es.addEventListener('agent', (e) => {
const { type, id, topic, status } = JSON.parse(e.data);
if (type === 'start') {
agents.set(id, { topic, tools: [], status: 'running' });
} else {
const agent = agents.get(id);
if (agent) agent.status = status;
}
renderAgentDashboard(agents);
});

es.addEventListener('tool', (e) => {
const { type, name, agent_id, results_count } = JSON.parse(e.data);
const agent = agents.get(agent_id);
if (agent) {
if (type === 'start') {
agent.tools.push({ name, status: 'running' });
} else {
const tool = agent.tools.find(
(t) => t.name === name && t.status === 'running'
);
if (tool) {
tool.status = 'done';
tool.results_count = results_count;
}
}
}
renderAgentDashboard(agents);
});

Troubleshooting

Connection Drops

SSE connections can be interrupted by network issues, proxies, or load balancers. Always implement reconnection with fromSequence:

  • Track the id field from each event
  • On disconnect, reconnect with ?fromSequence={lastId}
  • The server replays missed events automatically

No Events Received

If you connect but receive no events:

  • The stream may not be ready yet. The server waits up to 30 seconds for stream availability
  • Check that the task ID is correct and the task status is running
  • Verify your API key has access to the task

Duplicate Events After Reconnection

When reconnecting with fromSequence, you may receive the event at that sequence again. Use the sequence ID to deduplicate on the client side.