beava._app
bv.App() — the client
Synchronous Python client for the beava feature server. Seven wire-mapped methods plus a context-manager lifecycle. One transport per App; pick HTTP, TCP, or embed at construction time.
Overview
Every interaction with a running beava server flows through a single bv.App instance. The constructor picks a transport from the URL scheme; from then on, the seven methods below are 1:1 wire calls.
bv.App(
url: str | None = None,
*,
timeout: float = 30.0,
test_mode: bool = False,
)
Three transport modes, picked from the URL scheme:
- http:// / https:// — JSON over HTTP/1.1 on the data-plane port (default 8080).
- tcp:// — custom-framed TCP fast-path (default 8081). Strict-FIFO, one in-flight request per connection.
- None — embed mode. The SDK locates the beava binary on $PATH and spawns it on ephemeral ports for the lifetime of a with block.
Embed mode requires a with block. The spawned binary is torn down on context exit. Calling any method on an embed-mode App outside with raises RuntimeError.
Constructor parameters
Server URL. http://... / https://... for HTTP transport, tcp://... for the framed-TCP fast-path. None selects embed mode.
default None
Per-request transport-level I/O timeout in seconds. Applies to both HTTP and TCP transports.
default 30.0
Embed-mode only. Sets BEAVA_TEST_MODE=1 in the spawned binary's env so test-only opcodes (OP_RESET) are accepted. Setting test_mode=True against a network URL emits a UserWarning and is otherwise ignored — server-side configuration controls test mode for shared servers.
default False
An unrecognised URL scheme (anything other than http / https / tcp / None) raises ValueError at construction time.
Methods
Seven public methods. Each maps 1:1 to a wire opcode; the SDK does not batch, retry, or buffer on your behalf.
register(*descriptors, force=False, dry_run=False)
Register one or more event/table descriptors with the server. Hot-reloadable — the server picks up new pipelines without restarting; registry_version increments on each successful call.
Args
- *descriptors — descriptor objects produced by @bv.event or @bv.table. May also be plain dicts matching the wire shape (passthrough — used in tests).
- force — replace any existing pipeline of the same name.
- dry_run — server validates and returns a categorized diff payload ({diff, would_apply: false}) without committing.
Returns
The parsed server response — a flat dict whose shape varies by request:
{
"status": "ok",
"registry_version": 1,
"added": ["PageView", "UserStats"],
"already_present": [],
"registered_descriptors": [ /* full registry snapshot */ ]
}
Raises
- RegistrationError — a passed descriptor is a raw EventDerivation (a chain expression that wasn't wrapped in @bv.event def F(...)). The error message includes the canonical rewrite. Also raised when the server returns a 4xx/5xx with a structured error body.
Example
import beava as bv
@bv.event
class PageView:
user_id: str
path: str
@bv.table(key="user_id")
def UserStats(pv: PageView):
return pv.group_by("user_id").agg(
visits=bv.count(window="1h"),
)
app = bv.App("http://localhost:8080")
result = app.register(PageView, UserStats)
print(result["registry_version"]) # → 1
push(event_name, fields)
Push a single event to the server. Ack-on-write — the server returns a write-ahead-log offset and a registry-version stamp on every push.
Args
- event_name — the registered event type's name (e.g. "PageView").
- fields — event fields as a plain dict. Keys must match the event's schema.fields; types are coerced where possible.
Returns
{
"ack_lsn": 62,
"idempotent_replay": false,
"registry_version": 1
}
idempotent_replay is true when the server identified this push as a re-delivery of a prior event (via the event's dedupe_key); the underlying state was not mutated.
Raises
- RegistrationError with code="event_not_found" — event_name isn't registered.
- RegistrationError with code="schema_mismatch" — fields doesn't match the event's declared schema.
- RegistrationError with code="invalid_event" — the request body is malformed JSON or missing required keys.
Example
app.push("PageView", {
"user_id": "alice",
"path": "/home",
})
# → {"ack_lsn": 62, "idempotent_replay": false, "registry_version": 1}
get(table, key=None, features=None)
Get a single feature row by entity key. Cold-start (no events for that key) returns {}, not an error.
Args
- table — the registered table's name.
- key — entity key. Pass a string for single-column keys, a list for composite keys (matching the table's key=[...] declaration). None routes to the empty-string sentinel that addresses the single row of a global table (registered with key=[]).
- features — narrow the response to the named subset. None (default) returns the full row.
Returns
A flat dict mapping feature name → value. Examples:
app.get("UserStats", "alice")
{ "visits": 2 }
app.get("UserStats", "newuser")
{}
Raises
- RegistrationError with code="unknown_table" — table isn't registered.
batch_get(requests)
Multi-key fetch in a single round-trip. Returns a list of rows in request order; cold-start per-entry is {}.
Args
- requests — list of per-entry tuples. Each entry is either (table, key) or (table, key, features). The 2-tuple form returns the full row; the 3-tuple form narrows to the named features. Both shapes can be mixed in the same call.
Returns
list[dict[str, Any]] — one row per request, in the order requests were submitted.
Raises
- TypeError — a request entry is neither a 2-tuple nor a 3-tuple. Raised client-side.
- RegistrationError — server returned a structured error (e.g. one of the requested tables is unknown).
Example
rows = app.batch_get([
("UserStats", "alice"),
("UserStats", "bob", ["visits"]),
])
# → [{"visits": 2}, {"visits": 0}]
reset()
Wipe all server state — every registered descriptor, every per-entity aggregate. Test-mode-gated. Use it in tests, never in production.
Behavior
- In embed mode, test_mode=True is required at construction time so the spawned binary accepts OP_RESET.
- In network mode, the server itself must be running with --test-mode; the SDK has no way to flip the flag remotely.
- Returns None on success.
Raises
- RegistrationError with code="reset_disabled" — server is not in test mode.
- RuntimeError — embed-mode App used outside its with block.
Example
with bv.App(test_mode=True) as app:
app.register(PageView, UserStats)
# run a test ...
app.reset() # clean slate for the next test
ping()
Cheap liveness probe on the data plane. Returns the current registry version — useful as a cache-key invalidation / schema-evolution signal on long-lived connections.
Returns
{
"pong": true,
"registry_version": 1
}
Raises
- RegistrationError — non-200 from the server (defensively wrapped so callers don't deal with raw httpx exceptions).
close()
Release the underlying transport (closes the HTTP connection pool, the TCP socket, or tears down the embed-mode subprocess). Idempotent — safe to call twice. Calling any wire method after close() raises RuntimeError.
You normally don't call this directly:
- __exit__ calls it (so with bv.App(...) is enough).
- __del__ calls it (best-effort cleanup; suppressed exceptions).
Lifecycle
Network mode (http, tcp) is flexible — you can use the App as a long-lived module-level singleton or as a context manager. Embed mode requires a context manager so the spawned binary is torn down on exit.
with bv.App() as app:
app.register(PageView, UserStats)
app.push("PageView", {...})
# subprocess torn down here
app = bv.App("http://localhost:8080")
app.register(PageView, UserStats)
# ... use it for the lifetime of the process
app.close() # optional — __del__ also closes
Each embed-mode spawn allocates a unique tmpdir under $TMPDIR/beava-embed---/ and registers an atexit hook to clean it up if the context manager isn't used.
Common questions
Is App thread-safe?
The HTTP transport (httpx-backed) is thread-safe. The TCP transport is not — strict-FIFO at the protocol layer means one in-flight request per connection; sharing a TCP-mode App across threads will interleave frames and corrupt state. If you need TCP from multiple threads, give each thread its own App.
Does the SDK retry?
No. Each method is one wire call; transient transport errors propagate as httpx/socket exceptions. Retries belong in your application — beava's contract is that successful pushes are durable (WAL-acked), so you can safely re-push on connection loss.
What's the async story?
bv.App is sync. v0 doesn't ship an async client — wrap calls in asyncio.to_thread(...) from your event loop, or open multiple App instances in worker threads (HTTP transport is thread-safe).
Can I use one App against two servers?
No — one transport per App. Construct two App instances if you need to talk to two servers. Their state is independent.
Where to go next
You've got the client. The next thing to learn is what to register:
→ @bv.event (/python/event) — Declare event sources — the typed inputs you push into beava. Covers the class form, the def form, and the optional kwargs (dedupe, retention, cold-after).
→ @bv.table (/python/table) — Declare per-entity feature tables with group_by(...).agg(...). Composite keys, global tables, and the chain primitives.