SSE Subscribe Transport (@drakkar.software/starfish-client/events)
Real-time change notifications from a starfish server use Server-Sent Events (SSE).
The /events subpath export provides subscribeChanges — a typed auto-reconnecting
subscriber — plus low-level helpers for use in custom transports.
Quick start
import { subscribeChanges } from "@drakkar.software/starfish-client/events"
const unsub = subscribeChanges<{ type: string; payload: unknown }>({
url: "https://api.example.com/v1/myapp/events/spaces/sp-123",
onMessage(event) {
console.log("change:", event)
},
onStatus(status) {
// 0 = connecting, 200 = connected, negative = reconnect backoff
console.log("status:", status)
},
})
// Later:
unsub()
Signed events URL
When the server requires a signed request for the events endpoint, use
buildSignedEventsUrl to construct the correct URL from a client base URL and a
collection path:
import {
subscribeChanges,
buildSignedEventsUrl,
} from "@drakkar.software/starfish-client/events"
const url = buildSignedEventsUrl(
"https://api.example.com/v1/myapp/events",
{ since: Date.now() - 60_000 },
"myapp", // optional namespace — strip if already in eventsUrl
)
Auto-reconnect and backoff
subscribeChanges automatically reconnects after:
- A
fetchnetwork error (offline, DNS, timeout) - A server close / unexpected stream end
- Any non-200 response (the status is passed to
onStatusas the HTTP code)
Reconnect delay uses capped exponential backoff:
- Initial delay:
initialBackoffMs(default 500 ms) - Each failure doubles the delay
- Cap:
maxBackoffMs(default 30 000 ms = 30 s) - Delay is jittered ±15 % to prevent thundering-herd
Options reference
interface SubscribeChangesOptions<T> {
/** SSE endpoint URL (string) or a factory called on each reconnect. */
url: string | (() => string)
/** Called for each parsed event data object. */
onMessage: (event: T) => void
/** Called with the HTTP status on connection attempt (0 = connecting). */
onStatus?: (status: number) => void
/** Extra request headers (e.g. Authorization). */
headers?: Record<string, string>
/** Override the global `fetch`. */
fetch?: typeof globalThis.fetch
/** Initial reconnect delay in ms. Default 500. */
initialBackoffMs?: number
/** Max reconnect delay in ms. Default 30_000. */
maxBackoffMs?: number
}
subscribeChanges returns an unsubscribe function — call it to stop listening and
prevent any further reconnects.
Low-level frame parser
parseSseFrames(chunk, carry) is exposed for testing and custom transport wiring:
import { parseSseFrames } from "@drakkar.software/starfish-client/events"
let carry = ""
for await (const chunk of stream) {
const [frames, nextCarry] = parseSseFrames(chunk, carry)
carry = nextCarry
for (const frame of frames) {
process(frame)
}
}
Each returned frame is a parsed JSON object from the data: field.
Multi-line data: blocks are joined with \n. Comment lines (:) and
id:/event: lines are ignored.
Subpath export
The /events subpath is intentionally separate from the main starfish-client
entry point so bundlers can tree-shake it:
// Main entry — no SSE code included
import { StarfishClient } from "@drakkar.software/starfish-client"
// SSE code — separate chunk
import { subscribeChanges } from "@drakkar.software/starfish-client/events"