Skip to main content

Storage Backends (Python)

Starfish's AbstractObjectStore interface abstracts over any key/value store. The library ships three built-in implementations and makes it straightforward to bring your own.

TypeScript equivalent: docs/ts/server/storage.md


FilesystemObjectStore

Stores every document as a file on disk. Suitable for single-node deployments and local development.

Import path: starfish_server (top-level)

Extra dependency: aiofiles — install with pip install starfish-server (included by default)

from starfish_server import FilesystemObjectStore, FilesystemStorageOptions

store = FilesystemObjectStore(
FilesystemStorageOptions(base_dir="/var/lib/starfish/data")
)
OptionTypeDescription
base_dirstrRoot directory where objects are stored. Created automatically if absent.

Writes are atomic: data is written to a temporary file and renamed into place so a crash mid-write never corrupts stored data.


S3ObjectStore

Stores documents in any S3-compatible object store (AWS S3, MinIO, Cloudflare R2, Tigris, etc.). Suitable for horizontally-scaled or serverless deployments.

Import path: starfish_server.storage.s3

Extra dependency: aiobotocore — install with pip install starfish-server[s3]

from starfish_server.storage.s3 import S3ObjectStore, S3StorageOptions

store = S3ObjectStore(S3StorageOptions(
access_key_id=os.environ["S3_ACCESS_KEY_ID"],
secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
endpoint="https://s3.amazonaws.com",
bucket="my-starfish-bucket",
region="us-east-1",
))

Options

OptionTypeDefaultDescription
access_key_idstrAWS / S3-compatible access key ID
secret_access_keystrAWS / S3-compatible secret access key
endpointstrBase URL of the S3 service
bucketstrBucket name
regionstr"us-east-1"AWS region

Cleanup

Call await store.close() on shutdown to release underlying HTTP connections:

@asynccontextmanager
async def lifespan(app):
yield
await store.close()

app = FastAPI(lifespan=lifespan)

Or wire it through GracefulShutdown:

from starfish_server.lifecycle import GracefulShutdown, GracefulShutdownOptions

shutdown = GracefulShutdown(GracefulShutdownOptions(
on_shutdown=store.close,
))

MinIO example

store = S3ObjectStore(S3StorageOptions(
access_key_id="minioadmin",
secret_access_key="minioadmin",
endpoint="http://localhost:9000",
bucket="starfish",
region="us-east-1", # MinIO ignores this but it is required
))

Cloudflare R2 example

store = S3ObjectStore(S3StorageOptions(
access_key_id=os.environ["R2_ACCESS_KEY_ID"],
secret_access_key=os.environ["R2_SECRET_ACCESS_KEY"],
endpoint=f"https://{os.environ['CF_ACCOUNT_ID']}.r2.cloudflarestorage.com",
bucket="starfish",
region="auto",
))

MemoryObjectStore

In-process store backed by a Python dict. For unit tests only — data is lost on restart.

from starfish_server import MemoryObjectStore

# Shared global dict (convenient for quick scripts)
store = MemoryObjectStore()

# Isolated instance (pass {} to prevent cross-test pollution)
store = MemoryObjectStore(data={})

Bring your own store

Subclass AbstractObjectStore and implement five required methods (plus two optional binary methods):

from starfish_server.storage.base import AbstractObjectStore


class MyStore(AbstractObjectStore):
async def get_string(self, key: str) -> str | None:
...

async def put(self, key: str, body: str, *, content_type: str | None = None, cache_control: str | None = None) -> None:
...

async def list_keys(self, prefix: str, *, start_after: str | None = None, limit: int | None = None) -> list[str]:
...

async def delete(self, key: str) -> None:
...

async def delete_many(self, keys: list[str]) -> None:
...

# Optional — only needed for binary (non-JSON) collections
async def get_bytes(self, key: str) -> tuple[bytes, str] | None:
...

async def put_bytes(self, key: str, body: bytes, *, content_type: str, cache_control: str | None = None) -> None:
...

Alternatively, use CustomObjectStore for a callback-based approach without subclassing:

from starfish_server.storage.memory import CustomObjectStore

store = CustomObjectStore(
on_get=lambda key: my_backend.get(key),
on_put=lambda key, body, **kwargs: my_backend.set(key, body),
on_list=lambda prefix, start_after=None, limit=100: my_backend.scan(prefix),
on_delete=lambda key: my_backend.delete(key),
)

Request metadata via StoreContext

Every store method accepts a keyword-only context: StoreContext | None = None argument. When a request comes in through a route handler the library fills this with structured metadata about the request:

@dataclass(frozen=True)
class StoreContext:
collection: str # collection name from config (e.g. "profile")
params: Mapping[str, str] # resolved path params (e.g. {"identity": "alice"})
identity: str | None # authenticated caller, or None for public routes
roles: tuple[str, ...] # resolved roles for this caller
action: str # "pull" | "push" | "list" | "delete"
namespace: str | None # set when route lives under a namespace mount

CustomObjectStore — receiving context in callbacks

Callbacks that accept an extra positional argument automatically receive the context. Callbacks written with the old single-argument signature continue to work unchanged — arity is sniffed once at construction time using inspect.signature.

from starfish_server import CustomObjectStore

# Old-style — still works, ctx is never passed
store = CustomObjectStore(
on_get=lambda key: my_backend.get(key),
)

# New-style — receives full request context
async def on_put(key: str, body: str, ctx) -> None:
print(f"{ctx.identity} pushed to {ctx.collection}")
await my_backend.set(key, body)

store = CustomObjectStore(on_put=on_put)

System-internal calls (replica sync, config loading, enrichers) pass None — treat a missing context as "no request context available".