@drakkar.software/starfish-queuing
Change-event queuing extension for Starfish. After a successful push, the
server hands each registered plugin a WriteEvent; this plugin publishes a
QueueMessage to a configured transport (in-process MemoryQueue, callback
CustomQueue, or your own Queue implementation).
Install
pnpm add @drakkar.software/starfish-server @drakkar.software/starfish-queuing
Usage
import { createSyncRouter } from "@drakkar.software/starfish-server"
import { createQueuingServerPlugin, MemoryQueue } from "@drakkar.software/starfish-queuing"
const queue = new MemoryQueue()
const router = createSyncRouter({
config,
store,
// …
plugins: [
createQueuingServerPlugin({
queue,
collections: {
events: { topic: "events", includeParams: true, includeBody: true },
},
}),
],
})
The plugin publishes only for collections present in its collections map.
topic defaults to the collection name — an unset or empty-string topic falls
back to it (an empty broker subject is a footgun). Per-collection flags select what
the message carries: includeParams (route params), includeBody (the pushed
data, JSON only), and includeIdentity (the authenticated writer's userId). All
default off; includeIdentity in particular exposes who wrote off-box, so it is
strictly opt-in. shutdown() closes the queue when the server's graceful-shutdown
handler is given the plugin list.
subjectParam appends a route path-param to the subject — a per-resource subject
<topic>.<value> (e.g. posts.changed.<postId>) so a broker/consumer can filter by
resource (NATS <topic>.>) without parsing the body. The value is read from the
write event's params (independent of includeParams) and re-validated against
subjectIdPattern (default DEFAULT_SAFE_ID = ^[a-zA-Z0-9_-]+$); a missing,
non-string, or metacharacter-bearing id falls back to the base subject, so the broker
never sees a ./*/> token:
{ topic: "posts.changed", subjectParam: "postId" } // → posts.changed.<postId>
See docs/ts/queuing/ for the full guide.