Skip to main content

starfish-queuing

Change-event queuing extension for Starfish (Python). After a successful push, the server hands each registered plugin a WriteEvent; this plugin publishes a QueueMessage to a configured transport (MemoryQueue, CustomQueue, NatsQueue, or your own AbstractQueue).

Install

pip install starfish-server starfish-queuing
# with NATS support:
pip install "starfish-queuing[nats]"

Usage

from starfish_server import create_sync_router, SyncRouterOptions
from starfish_queuing import create_queuing_server_plugin, MemoryQueue, QueueConfig

queue = MemoryQueue()

plugin = create_queuing_server_plugin(
queue=queue,
collections={
"events": QueueConfig(topic="events", include_params=True, include_body=True),
},
)

router = create_sync_router(
SyncRouterOptions(
config=config,
store=store,
# …
plugins=[plugin],
)
)

The plugin publishes only for collections present in its collections map. topic defaults to the collection name — an unset or empty-string topic falls back to it (an empty broker subject is a footgun). Per-collection flags select what the message carries: include_params (route params), include_body (the pushed data, JSON only), and include_identity (the authenticated writer's userId). All default off; include_identity in particular exposes who wrote off-box, so it is strictly opt-in. shutdown() closes the queue when the server's graceful-shutdown handler is given the plugin list.

subject_param appends a route path-param to the subject — a per-resource subject <topic>.<value> (e.g. posts.changed.<postId>) so a broker/consumer can filter by resource (NATS <topic>.>) without parsing the body. The value is read from the write event's params (independent of include_params) and re-validated against subject_id_pattern (default DEFAULT_SAFE_ID = ^[a-zA-Z0-9_-]+$); a missing, non-string, or metacharacter-bearing id falls back to the base subject, so the broker never sees a ./*/> token:

QueueConfig(topic="posts.changed", subject_param="postId") # → posts.changed.<postId>

See docs/ts/queuing/ for the full guide (the TypeScript and Python APIs mirror each other).