beava._agg
Operator catalogue
Fifty-five aggregation primitives across seven families. Every operator is a function you call inside agg(name=bv.<op>(...)); the family changes what gets remembered, the shape stays the same.
Overview
Every operator is a function in the bv namespace that returns an AggDescriptor. You never construct one directly — you call it inside the agg() step of a chain, and the keyword you bind it to becomes the column name in the resulting feature table.
import beava as bv @bv.event class PageView: user_id: str spend: float @bv.table(key="user_id") def UserStats(e: PageView): return e.group_by("user_id").agg( visits=bv.count(window="1h"), spend_avg=bv.mean("spend", window="24h"), )
Three argument shapes show up across the catalogue:
- Field-bearing — the first positional arg is a string column name (
bv.sum("amount", ...)). Passing an_ExprraisesRegistrationError(code="schema_mismatch")at register time; build the derived column upstream withwith_columns. - Fieldless — counts, recency probes, and ratios that don't reference a column (
bv.count(...),bv.first_seen(),bv.has_seen()). - Bound — a memory-bound argument is required:
window="<duration>"(rolling),half_life="<duration>"(exponential decay), or a per-op cap (k=,n=,samples=,buckets=[…]).
Durations are strings matching \d+(ms|s|m|h|d) or the literal "forever". All optional where= kwargs accept a bv.col(...) expression and filter the operator's input on a per-event basis.
At a glance
Seven families, grouped by what each one remembers about the entity:
| Family | Count | What it remembers |
|---|---|---|
| Core | 11 | Counts, sums, extremes, the basics. count, sum, mean, min, max, var, std, ratio, n_unique, first, last. |
| Sketch | 10 | Bounded-memory shape estimators — quantiles, histograms, top-k, sampled buffers. |
| Recency | 10 | Time-since-last-X probes, streaks, ever-seen flags. Cheap, scalar, no per-event memory. |
| Decay | 6 | Exponentially-weighted aggregates and time-weighted average. half_life= instead of window=. |
| Velocity | 9 | Rate-of-change, trends, residuals, z-scores, burst and outlier counters. |
| Buffer | 4 | Bounded ordered buffers — first-N, last-N, lag, delta-from-previous. |
| Geo | 4 | Lat/lon-pair operators built on the haversine formula. lat= + lon= name the coordinate columns. |
Core
The basics. Counters, sums, extremes, ratios. Most accept an optional rolling window= kwarg; omit it for lifetime-cumulative behavior.
count(window=None, where=None)
Number of events in the rolling window (or the entity's lifetime when window=None). Returns i64.
window— rolling window like"1h","24h","7d". Optional.where— optional filter; only matching events are counted.
e.group_by("user_id").agg( visits_1h=bv.count(window="1h"), failed_1h=bv.count(window="1h", where=bv.col("status") == "fail"), )
sum(field, window=None, where=None)
Sum of field over the window. Returns f64.
field— string column name. Passing an expression raisesRegistrationError(code="schema_mismatch").window— rolling duration; optional.where— optional row filter.
e.group_by("user_id").agg( spend_24h=bv.sum("amount", window="24h"), )
mean(field, window=None, where=None)
Arithmetic mean of field. Returns f64.
e.group_by("user_id").agg( avg_amount_1h=bv.mean("amount", window="1h"), )
min(field, window=None, where=None)
Smallest value of field seen. Output type inherits from the upstream column (i64 / f64 / str).
e.group_by("user_id").agg( min_amount=bv.min("amount", window="7d"), )
max(field, window=None, where=None)
Largest value of field. Output type inherits from the upstream column.
e.group_by("user_id").agg( max_amount=bv.max("amount", window="7d"), )
var(field, window=None, where=None)
Population variance of field. Returns f64.
e.group_by("user_id").agg( spend_var_24h=bv.var("amount", window="24h"), )
std(field, window=None, where=None)
Standard deviation of field. Returns f64.
e.group_by("user_id").agg( spend_std_24h=bv.std("amount", window="24h"), )
ratio(window=None, where=None)
Server-computed ratio matched / total over the window. The where= expression filters the numerator only; the denominator is the unfiltered event count. Returns f64.
e.group_by("user_id").agg( failure_rate_1h=bv.ratio( window="1h", where=bv.col("status") == "fail", ), )
n_unique(field, window=None, where=None)
Approximate distinct-value count for field, computed with HyperLogLog. Returns i64.
e.group_by("user_id").agg( distinct_ips_24h=bv.n_unique("ip", window="24h"), )
first(field)
Value of field at the first event seen for the entity. Lifetime only — no window=. Output type inherits from the upstream column.
e.group_by("user_id").agg( signup_country=bv.first("country"), )
last(field)
Value of field at the most recent event. Lifetime only. Output type inherits from the upstream column.
e.group_by("user_id").agg( last_country=bv.last("country"), )
Sketch
Bounded-memory shape estimators. Each carries its own bound — k=, n=, samples=, or a fixed buckets= list — so per-entity memory stays predictable.
quantile(field, q, window=None, where=None)
Approximate quantile of field at probability q (open interval (0, 1)). Returns f64.
q— required float in(0, 1); e.g.0.5for the median,0.99for P99.
e.group_by("user_id").agg( p99_latency_1h=bv.quantile("latency_ms", q=0.99, window="1h"), )
top_k(field, k, window=None, where=None)
Top-K most frequent values of field (count-min sketch + min-heap). Returns a JSON-string payload of [{value, count}, ...].
k— required positive integer; the heap size.
e.group_by("user_id").agg( top_paths_1h=bv.top_k("path", k=5, window="1h"), )
bloom_member(field, window=None, where=None)
Bloom-filter membership probe. Server compares the current event's field against the filter accumulated for the entity. Returns bool (false-positive-bounded).
e.group_by("user_id").agg( seen_ip_before=bv.bloom_member("ip", window="30d"), )
entropy(field, window=None, where=None)
Shannon entropy of field's value distribution (bits). Returns f64. Useful for "is this user spread across many values, or dialed in on a few."
e.group_by("user_id").agg( path_entropy_1h=bv.entropy("path", window="1h"), )
histogram(field, buckets, where=None)
Lifetime fixed-bucket count histogram of field. Returns a JSON-string payload of bucket counts. Lifetime-only — no window=; the bound is the bucket list itself.
buckets— required strictly-increasinglist[float]of split points.
e.group_by("user_id").agg( spend_dist=bv.histogram("amount", buckets=[10, 100, 1000, 10000]), )
hour_of_day_histogram(where=None)
Lifetime 24-bucket per-hour event-count histogram (UTC hour). Fieldless. Returns a JSON-string payload.
e.group_by("user_id").agg( activity_by_hour=bv.hour_of_day_histogram(), )
dow_hour_histogram(where=None)
Lifetime 168-bucket histogram keyed by (day_of_week, hour). Fieldless. Returns a JSON-string payload.
e.group_by("user_id").agg( weekly_pattern=bv.dow_hour_histogram(), )
event_type_mix(field, categories=None, max_categories=256, where=None)
Lifetime proportion-per-category sketch over field. Returns a JSON-string payload. Bounded by max_categories; pass an explicit categories allowlist to lock the bucket set.
categories— optional allowlist; takes precedence over the cap-and-drop path.max_categories— fallback bound whencategoriesis unset (default256).
e.group_by("user_id").agg( action_mix=bv.event_type_mix( "action", categories=["view", "click", "purchase"], ), )
most_recent_n(field, n, where=None)
The n most recent values of field, newest-first. Lifetime, capped at n. Returns a JSON-string array.
n— required positive integer; the buffer size.
e.group_by("user_id").agg( last_5_paths=bv.most_recent_n("path", n=5), )
reservoir_sample(field, samples, where=None)
Lifetime Vitter Algorithm-R reservoir sample of field. Returns a JSON-string array of length up to samples.
samples— required positive integer; the reservoir size.
e.group_by("user_id").agg( amount_sample=bv.reservoir_sample("amount", samples=100), )
Recency
"How long ago" and "how many in a row." These are scalar probes — they don't keep per-event history, so they're cheap.
first_seen()
Wall-clock millisecond timestamp of the first event for the entity. Returns i64. Fieldless, no kwargs.
e.group_by("user_id").agg( signup_ts=bv.first_seen(), )
last_seen()
Wall-clock ms timestamp of the most recent event. Returns i64.
e.group_by("user_id").agg( last_active_ts=bv.last_seen(), )
age()
Query-time elapsed milliseconds since first_seen. Returns i64. Computed at get time, not at push time.
e.group_by("user_id").agg( account_age_ms=bv.age(), )
has_seen(where=None)
Boolean flag — has the entity ever produced a matching event? Returns bool. Fieldless; the predicate goes in where=.
e.group_by("user_id").agg( ever_failed=bv.has_seen(where=bv.col("status") == "fail"), )
time_since(where=None)
Query-time elapsed milliseconds since the most recent matching event. Returns i64.
e.group_by("user_id").agg( ms_since_failure=bv.time_since(where=bv.col("status") == "fail"), )
time_since_last_n(n, where=None)
Query-time elapsed milliseconds since the n-th most recent matching event — silence relative to a buried event, not just the latest. Returns i64.
n— required positive integer; rank from the latest.
e.group_by("user_id").agg( ms_since_3rd_login=bv.time_since_last_n(n=3), )
streak(where=None)
Current run length of consecutive matching events. Resets on the first non-match. Returns i64.
e.group_by("user_id").agg( consec_failures=bv.streak(where=bv.col("status") == "fail"), )
max_streak(where=None)
All-time peak streak length. Returns i64.
e.group_by("user_id").agg( worst_failure_run=bv.max_streak(where=bv.col("status") == "fail"), )
negative_streak(where=None)
Current run length of consecutive non-matching events — the inverse of streak. Returns i64.
e.group_by("user_id").agg( consec_clean=bv.negative_streak(where=bv.col("status") == "fail"), )
first_seen_in_window(window, where=None)
Boolean — is there a matching event within the past window? Returns bool. window= is required.
e.group_by("user_id").agg( active_last_24h=bv.first_seen_in_window(window="24h"), )
Decay
Exponentially-weighted aggregates. Use half_life= instead of window=: each event's contribution decays by 50% every half_life of elapsed time, so old events fade rather than getting evicted.
ewma(field, half_life, where=None)
Exponentially weighted moving average of field. Returns f64.
half_life— required duration string; e.g."15m","1h".
e.group_by("user_id").agg( spend_ewma=bv.ewma("amount", half_life="30m"), )
ewvar(field, half_life, where=None)
Exponentially weighted variance of field. Returns f64.
e.group_by("user_id").agg( spend_ewvar=bv.ewvar("amount", half_life="30m"), )
ew_zscore(field, half_life, where=None)
Z-score of the most recent field value against the EWMA / EWVAR baseline at the same half-life. Returns f64.
e.group_by("user_id").agg( spend_zscore=bv.ew_zscore("amount", half_life="1h"), )
decayed_sum(field, half_life, where=None)
Time-decayed sum of field. Each event contributes its value times 0.5^(age / half_life). Returns f64.
e.group_by("user_id").agg( fading_spend=bv.decayed_sum("amount", half_life="6h"), )
decayed_count(half_life, where=None)
Time-decayed event count — fieldless. Returns f64.
e.group_by("user_id").agg( fading_visits=bv.decayed_count(half_life="1h"), )
twa(field, window, where=None)
Time-weighted average of field over window — each value contributes proportionally to the duration it was the most recent observation. Returns f64. window= is required.
e.group_by("user_id").agg( avg_balance_24h=bv.twa("balance", window="24h"), )
Velocity
Change-of-rate, trend, residual, and outlier counters. Each requires a window= bound (or baseline_window= for z_score) — there's no lifetime variant; the comparison is always against a rolling reference frame.
rate_of_change(field, window, where=None)
Slope of field versus time across the window — (latest - earliest) / window_seconds. Returns f64. window= is required.
e.group_by("user_id").agg( spend_velocity_1h=bv.rate_of_change("amount", window="1h"), )
inter_arrival_stats(window, where=None)
Mean inter-arrival time (ms) between consecutive matching events in the window. Fieldless. Returns f64.
e.group_by("user_id").agg( mean_gap_ms_1h=bv.inter_arrival_stats(window="1h"), )
burst_count(window, sub_window, where=None)
Number of sub_window-sized buckets inside window that had at least one event. Detects clustered activity rather than steady throughput. Returns i64.
window— outer bound; required.sub_window— inner bucket size; required (must also be a valid duration).
e.group_by("user_id").agg( bursts_1h=bv.burst_count(window="1h", sub_window="1m"), )
trend(field, window, where=None)
OLS slope of field versus event timestamp across window. Returns f64. Distinct from rate_of_change: trend is a regression slope; rate_of_change is endpoint-only.
e.group_by("user_id").agg( spend_trend_24h=bv.trend("amount", window="24h"), )
trend_residual(field, window, where=None)
RMS of the residuals from the OLS fit underlying trend. Returns f64 — high values mean the trend explains little of the variance.
e.group_by("user_id").agg( spend_residual_24h=bv.trend_residual("amount", window="24h"), )
outlier_count(field, window, sigma=3.0, where=None)
Number of events where field falls outside ±sigma standard deviations of the window's mean. Returns i64.
sigma— multiplier on the standard deviation. Default3.0.
e.group_by("user_id").agg( big_spends_24h=bv.outlier_count("amount", window="24h", sigma=2.5), )
value_change_count(field, window, where=None)
Number of times field's value differed from its previous value inside the window — a coarse churn indicator. Returns i64.
e.group_by("user_id").agg( device_swaps_24h=bv.value_change_count("device_id", window="24h"), )
z_score(field, baseline_window, where=None)
Z-score of the latest field value against the rolling mean / stddev over baseline_window. Returns f64. Note the kwarg name is baseline_window=, not window=.
e.group_by("user_id").agg( spend_z=bv.z_score("amount", baseline_window="7d"), )
seasonal_deviation(field, where=None)
Lifetime z-score of the latest field against the hour-of-day baseline accumulated for the entity. Catches "this user is normally quiet at 3am, why are they spending now." Returns f64. Lifetime-only — no window=.
e.group_by("user_id").agg( odd_hour_spend=bv.seasonal_deviation("amount"), )
Buffer
Bounded ordered buffers — keep a small list of recent values per entity, in the order they arrived. Memory is fixed by the n= bound at register time.
first_n(field, n, where=None)
The first n matching values of field in insertion order. Once the buffer is full, later events are ignored. Returns a JSON-string array.
e.group_by("user_id").agg( first_3_paths=bv.first_n("path", n=3), )
last_n(field, n, where=None)
The last n matching values of field, oldest-to-newest. Sliding ring buffer — the oldest entry is dropped when a new one arrives. Returns a JSON-string array.
e.group_by("user_id").agg( recent_paths=bv.last_n("path", n=10), )
lag(field, n=1)
The value of field from n events ago. Output type inherits from the upstream column. n defaults to 1 (the immediately previous value).
e.group_by("user_id").agg( prev_amount=bv.lag("amount"), amount_2_ago=bv.lag("amount", n=2), )
delta_from_prev(field, where=None)
Difference between the latest field value and the previous one. Output type inherits from the upstream column.
e.group_by("user_id").agg( spend_delta=bv.delta_from_prev("amount"), )
Geo
Lat/lon-pair operators built on the haversine formula. Each takes lat= and lon= kwargs naming the latitude / longitude columns on the upstream event; both columns are read on every matching event.
geo_velocity(lat, lon, where=None)
Lifetime maximum km/h between consecutive matching events. Catches "this card was used in two cities five minutes apart." Returns f64.
e.group_by("user_id").agg( impossible_speed=bv.geo_velocity(lat="lat", lon="lon"), )
geo_distance(lat, lon, where=None)
Lifetime cumulative haversine path length (km) traced by the entity's matching events. Returns f64.
e.group_by("user_id").agg( total_km=bv.geo_distance(lat="lat", lon="lon"), )
geo_spread(lat, lon, where=None)
Lifetime RMS dispersion (km) of matching events around the running centroid — "how spread out is this entity's footprint." Returns f64.
e.group_by("user_id").agg( geo_footprint_km=bv.geo_spread(lat="lat", lon="lon"), )
distance_from_home(lat, lon, samples=100, where=None)
Distance (km) from the current event's coordinates to the centroid of the last samples matching events. Returns f64.
samples— sliding window of points used to compute the home centroid. Default100.
e.group_by("user_id").agg( km_from_home=bv.distance_from_home(lat="lat", lon="lon", samples=50), )
Where to go next
You've got the catalogue. The two adjacent pages cover the expression DSL the operators consume and the error codes the server returns:
bv.col / bv.litThe column-reference expression DSL. Used inside where=, filter(...), and with_columns(...) chain stages — the way you build the expressions you pass to operators.
Every structured error code the server returns — unknown_table, event_not_found, invalid_event, reset_disabled_in_production, and the rest. Mapped to the SDK's RegistrationError exception.