beava/ SDK reference/ Operator catalogue
Python SDK beava._agg

Operator catalogue

Fifty-five aggregation primitives across seven families. Every operator is a function you call inside agg(name=bv.<op>(...)); the family changes what gets remembered, the shape stays the same.

Overview

Every operator is a function in the bv namespace that returns an AggDescriptor. You never construct one directly — you call it inside the agg() step of a chain, and the keyword you bind it to becomes the column name in the resulting feature table.

shape
import beava as bv

@bv.event
class PageView:
    user_id: str
    spend:   float

@bv.table(key="user_id")
def UserStats(e: PageView):
    return e.group_by("user_id").agg(
        visits=bv.count(window="1h"),
        spend_avg=bv.mean("spend", window="24h"),
    )

Three argument shapes show up across the catalogue:

Durations are strings matching \d+(ms|s|m|h|d) or the literal "forever". All optional where= kwargs accept a bv.col(...) expression and filter the operator's input on a per-event basis.

At a glance

Seven families, grouped by what each one remembers about the entity:

FamilyCountWhat it remembers
Core 11Counts, sums, extremes, the basics. count, sum, mean, min, max, var, std, ratio, n_unique, first, last.
Sketch 10Bounded-memory shape estimators — quantiles, histograms, top-k, sampled buffers.
Recency 10Time-since-last-X probes, streaks, ever-seen flags. Cheap, scalar, no per-event memory.
Decay 6Exponentially-weighted aggregates and time-weighted average. half_life= instead of window=.
Velocity 9Rate-of-change, trends, residuals, z-scores, burst and outlier counters.
Buffer 4Bounded ordered buffers — first-N, last-N, lag, delta-from-previous.
Geo 4Lat/lon-pair operators built on the haversine formula. lat= + lon= name the coordinate columns.

Core

The basics. Counters, sums, extremes, ratios. Most accept an optional rolling window= kwarg; omit it for lifetime-cumulative behavior.

count(window=None, where=None)

count(*, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Number of events in the rolling window (or the entity's lifetime when window=None). Returns i64.

count.py
e.group_by("user_id").agg(
    visits_1h=bv.count(window="1h"),
    failed_1h=bv.count(window="1h", where=bv.col("status") == "fail"),
)

sum(field, window=None, where=None)

sum(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Sum of field over the window. Returns f64.

sum.py
e.group_by("user_id").agg(
    spend_24h=bv.sum("amount", window="24h"),
)

mean(field, window=None, where=None)

mean(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Arithmetic mean of field. Returns f64.

mean.py
e.group_by("user_id").agg(
    avg_amount_1h=bv.mean("amount", window="1h"),
)

min(field, window=None, where=None)

min(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Smallest value of field seen. Output type inherits from the upstream column (i64 / f64 / str).

min.py
e.group_by("user_id").agg(
    min_amount=bv.min("amount", window="7d"),
)

max(field, window=None, where=None)

max(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Largest value of field. Output type inherits from the upstream column.

max.py
e.group_by("user_id").agg(
    max_amount=bv.max("amount", window="7d"),
)

var(field, window=None, where=None)

var(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Population variance of field. Returns f64.

var.py
e.group_by("user_id").agg(
    spend_var_24h=bv.var("amount", window="24h"),
)

std(field, window=None, where=None)

std(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Standard deviation of field. Returns f64.

std.py
e.group_by("user_id").agg(
    spend_std_24h=bv.std("amount", window="24h"),
)

ratio(window=None, where=None)

ratio(*, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Server-computed ratio matched / total over the window. The where= expression filters the numerator only; the denominator is the unfiltered event count. Returns f64.

ratio.py
e.group_by("user_id").agg(
    failure_rate_1h=bv.ratio(
        window="1h",
        where=bv.col("status") == "fail",
    ),
)

n_unique(field, window=None, where=None)

n_unique(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Approximate distinct-value count for field, computed with HyperLogLog. Returns i64.

n_unique.py
e.group_by("user_id").agg(
    distinct_ips_24h=bv.n_unique("ip", window="24h"),
)

first(field)

first(field: str) -> AggDescriptor

Value of field at the first event seen for the entity. Lifetime only — no window=. Output type inherits from the upstream column.

first.py
e.group_by("user_id").agg(
    signup_country=bv.first("country"),
)

last(field)

last(field: str) -> AggDescriptor

Value of field at the most recent event. Lifetime only. Output type inherits from the upstream column.

last.py
e.group_by("user_id").agg(
    last_country=bv.last("country"),
)

Sketch

Bounded-memory shape estimators. Each carries its own bound — k=, n=, samples=, or a fixed buckets= list — so per-entity memory stays predictable.

quantile(field, q, window=None, where=None)

quantile(field: str, *, q: float, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Approximate quantile of field at probability q (open interval (0, 1)). Returns f64.

quantile.py
e.group_by("user_id").agg(
    p99_latency_1h=bv.quantile("latency_ms", q=0.99, window="1h"),
)

top_k(field, k, window=None, where=None)

top_k(field: str, *, k: int, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Top-K most frequent values of field (count-min sketch + min-heap). Returns a JSON-string payload of [{value, count}, ...].

top_k.py
e.group_by("user_id").agg(
    top_paths_1h=bv.top_k("path", k=5, window="1h"),
)

bloom_member(field, window=None, where=None)

bloom_member(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Bloom-filter membership probe. Server compares the current event's field against the filter accumulated for the entity. Returns bool (false-positive-bounded).

bloom_member.py
e.group_by("user_id").agg(
    seen_ip_before=bv.bloom_member("ip", window="30d"),
)

entropy(field, window=None, where=None)

entropy(field: str, *, window: str | None = None, where: _Expr | None = None) -> AggDescriptor

Shannon entropy of field's value distribution (bits). Returns f64. Useful for "is this user spread across many values, or dialed in on a few."

entropy.py
e.group_by("user_id").agg(
    path_entropy_1h=bv.entropy("path", window="1h"),
)

histogram(field, buckets, where=None)

histogram(field: str, *, buckets: list[float], where: _Expr | None = None) -> AggDescriptor

Lifetime fixed-bucket count histogram of field. Returns a JSON-string payload of bucket counts. Lifetime-only — no window=; the bound is the bucket list itself.

histogram.py
e.group_by("user_id").agg(
    spend_dist=bv.histogram("amount", buckets=[10, 100, 1000, 10000]),
)

hour_of_day_histogram(where=None)

hour_of_day_histogram(*, where: _Expr | None = None) -> AggDescriptor

Lifetime 24-bucket per-hour event-count histogram (UTC hour). Fieldless. Returns a JSON-string payload.

hour_of_day_histogram.py
e.group_by("user_id").agg(
    activity_by_hour=bv.hour_of_day_histogram(),
)

dow_hour_histogram(where=None)

dow_hour_histogram(*, where: _Expr | None = None) -> AggDescriptor

Lifetime 168-bucket histogram keyed by (day_of_week, hour). Fieldless. Returns a JSON-string payload.

dow_hour_histogram.py
e.group_by("user_id").agg(
    weekly_pattern=bv.dow_hour_histogram(),
)

event_type_mix(field, categories=None, max_categories=256, where=None)

event_type_mix( field: str, *, categories: list[str] | None = None, max_categories: int = 256, where: _Expr | None = None, ) -> AggDescriptor

Lifetime proportion-per-category sketch over field. Returns a JSON-string payload. Bounded by max_categories; pass an explicit categories allowlist to lock the bucket set.

event_type_mix.py
e.group_by("user_id").agg(
    action_mix=bv.event_type_mix(
        "action",
        categories=["view", "click", "purchase"],
    ),
)

most_recent_n(field, n, where=None)

most_recent_n(field: str, *, n: int, where: _Expr | None = None) -> AggDescriptor

The n most recent values of field, newest-first. Lifetime, capped at n. Returns a JSON-string array.

most_recent_n.py
e.group_by("user_id").agg(
    last_5_paths=bv.most_recent_n("path", n=5),
)

reservoir_sample(field, samples, where=None)

reservoir_sample(field: str, *, samples: int, where: _Expr | None = None) -> AggDescriptor

Lifetime Vitter Algorithm-R reservoir sample of field. Returns a JSON-string array of length up to samples.

reservoir_sample.py
e.group_by("user_id").agg(
    amount_sample=bv.reservoir_sample("amount", samples=100),
)

Recency

"How long ago" and "how many in a row." These are scalar probes — they don't keep per-event history, so they're cheap.

first_seen()

first_seen() -> AggDescriptor

Wall-clock millisecond timestamp of the first event for the entity. Returns i64. Fieldless, no kwargs.

first_seen.py
e.group_by("user_id").agg(
    signup_ts=bv.first_seen(),
)

last_seen()

last_seen() -> AggDescriptor

Wall-clock ms timestamp of the most recent event. Returns i64.

last_seen.py
e.group_by("user_id").agg(
    last_active_ts=bv.last_seen(),
)

age()

age() -> AggDescriptor

Query-time elapsed milliseconds since first_seen. Returns i64. Computed at get time, not at push time.

age.py
e.group_by("user_id").agg(
    account_age_ms=bv.age(),
)

has_seen(where=None)

has_seen(*, where: _Expr | None = None) -> AggDescriptor

Boolean flag — has the entity ever produced a matching event? Returns bool. Fieldless; the predicate goes in where=.

has_seen.py
e.group_by("user_id").agg(
    ever_failed=bv.has_seen(where=bv.col("status") == "fail"),
)

time_since(where=None)

time_since(*, where: _Expr | None = None) -> AggDescriptor

Query-time elapsed milliseconds since the most recent matching event. Returns i64.

time_since.py
e.group_by("user_id").agg(
    ms_since_failure=bv.time_since(where=bv.col("status") == "fail"),
)

time_since_last_n(n, where=None)

time_since_last_n(*, n: int, where: _Expr | None = None) -> AggDescriptor

Query-time elapsed milliseconds since the n-th most recent matching event — silence relative to a buried event, not just the latest. Returns i64.

time_since_last_n.py
e.group_by("user_id").agg(
    ms_since_3rd_login=bv.time_since_last_n(n=3),
)

streak(where=None)

streak(*, where: _Expr | None = None) -> AggDescriptor

Current run length of consecutive matching events. Resets on the first non-match. Returns i64.

streak.py
e.group_by("user_id").agg(
    consec_failures=bv.streak(where=bv.col("status") == "fail"),
)

max_streak(where=None)

max_streak(*, where: _Expr | None = None) -> AggDescriptor

All-time peak streak length. Returns i64.

max_streak.py
e.group_by("user_id").agg(
    worst_failure_run=bv.max_streak(where=bv.col("status") == "fail"),
)

negative_streak(where=None)

negative_streak(*, where: _Expr | None = None) -> AggDescriptor

Current run length of consecutive non-matching events — the inverse of streak. Returns i64.

negative_streak.py
e.group_by("user_id").agg(
    consec_clean=bv.negative_streak(where=bv.col("status") == "fail"),
)

first_seen_in_window(window, where=None)

first_seen_in_window(*, window: str, where: _Expr | None = None) -> AggDescriptor

Boolean — is there a matching event within the past window? Returns bool. window= is required.

first_seen_in_window.py
e.group_by("user_id").agg(
    active_last_24h=bv.first_seen_in_window(window="24h"),
)

Decay

Exponentially-weighted aggregates. Use half_life= instead of window=: each event's contribution decays by 50% every half_life of elapsed time, so old events fade rather than getting evicted.

ewma(field, half_life, where=None)

ewma(field: str, *, half_life: str, where: _Expr | None = None) -> AggDescriptor

Exponentially weighted moving average of field. Returns f64.

ewma.py
e.group_by("user_id").agg(
    spend_ewma=bv.ewma("amount", half_life="30m"),
)

ewvar(field, half_life, where=None)

ewvar(field: str, *, half_life: str, where: _Expr | None = None) -> AggDescriptor

Exponentially weighted variance of field. Returns f64.

ewvar.py
e.group_by("user_id").agg(
    spend_ewvar=bv.ewvar("amount", half_life="30m"),
)

ew_zscore(field, half_life, where=None)

ew_zscore(field: str, *, half_life: str, where: _Expr | None = None) -> AggDescriptor

Z-score of the most recent field value against the EWMA / EWVAR baseline at the same half-life. Returns f64.

ew_zscore.py
e.group_by("user_id").agg(
    spend_zscore=bv.ew_zscore("amount", half_life="1h"),
)

decayed_sum(field, half_life, where=None)

decayed_sum(field: str, *, half_life: str, where: _Expr | None = None) -> AggDescriptor

Time-decayed sum of field. Each event contributes its value times 0.5^(age / half_life). Returns f64.

decayed_sum.py
e.group_by("user_id").agg(
    fading_spend=bv.decayed_sum("amount", half_life="6h"),
)

decayed_count(half_life, where=None)

decayed_count(*, half_life: str, where: _Expr | None = None) -> AggDescriptor

Time-decayed event count — fieldless. Returns f64.

decayed_count.py
e.group_by("user_id").agg(
    fading_visits=bv.decayed_count(half_life="1h"),
)

twa(field, window, where=None)

twa(field: str, *, window: str, where: _Expr | None = None) -> AggDescriptor

Time-weighted average of field over window — each value contributes proportionally to the duration it was the most recent observation. Returns f64. window= is required.

twa.py
e.group_by("user_id").agg(
    avg_balance_24h=bv.twa("balance", window="24h"),
)

Velocity

Change-of-rate, trend, residual, and outlier counters. Each requires a window= bound (or baseline_window= for z_score) — there's no lifetime variant; the comparison is always against a rolling reference frame.

rate_of_change(field, window, where=None)

rate_of_change(field: str, *, window: str, where: _Expr | None = None) -> AggDescriptor

Slope of field versus time across the window — (latest - earliest) / window_seconds. Returns f64. window= is required.

rate_of_change.py
e.group_by("user_id").agg(
    spend_velocity_1h=bv.rate_of_change("amount", window="1h"),
)

inter_arrival_stats(window, where=None)

inter_arrival_stats(*, window: str, where: _Expr | None = None) -> AggDescriptor

Mean inter-arrival time (ms) between consecutive matching events in the window. Fieldless. Returns f64.

inter_arrival_stats.py
e.group_by("user_id").agg(
    mean_gap_ms_1h=bv.inter_arrival_stats(window="1h"),
)

burst_count(window, sub_window, where=None)

burst_count(*, window: str, sub_window: str, where: _Expr | None = None) -> AggDescriptor

Number of sub_window-sized buckets inside window that had at least one event. Detects clustered activity rather than steady throughput. Returns i64.

burst_count.py
e.group_by("user_id").agg(
    bursts_1h=bv.burst_count(window="1h", sub_window="1m"),
)

trend(field, window, where=None)

trend(field: str, *, window: str, where: _Expr | None = None) -> AggDescriptor

OLS slope of field versus event timestamp across window. Returns f64. Distinct from rate_of_change: trend is a regression slope; rate_of_change is endpoint-only.

trend.py
e.group_by("user_id").agg(
    spend_trend_24h=bv.trend("amount", window="24h"),
)

trend_residual(field, window, where=None)

trend_residual(field: str, *, window: str, where: _Expr | None = None) -> AggDescriptor

RMS of the residuals from the OLS fit underlying trend. Returns f64 — high values mean the trend explains little of the variance.

trend_residual.py
e.group_by("user_id").agg(
    spend_residual_24h=bv.trend_residual("amount", window="24h"),
)

outlier_count(field, window, sigma=3.0, where=None)

outlier_count(field: str, *, window: str, sigma: float = 3.0, where: _Expr | None = None) -> AggDescriptor

Number of events where field falls outside ±sigma standard deviations of the window's mean. Returns i64.

outlier_count.py
e.group_by("user_id").agg(
    big_spends_24h=bv.outlier_count("amount", window="24h", sigma=2.5),
)

value_change_count(field, window, where=None)

value_change_count(field: str, *, window: str, where: _Expr | None = None) -> AggDescriptor

Number of times field's value differed from its previous value inside the window — a coarse churn indicator. Returns i64.

value_change_count.py
e.group_by("user_id").agg(
    device_swaps_24h=bv.value_change_count("device_id", window="24h"),
)

z_score(field, baseline_window, where=None)

z_score(field: str, *, baseline_window: str, where: _Expr | None = None) -> AggDescriptor

Z-score of the latest field value against the rolling mean / stddev over baseline_window. Returns f64. Note the kwarg name is baseline_window=, not window=.

z_score.py
e.group_by("user_id").agg(
    spend_z=bv.z_score("amount", baseline_window="7d"),
)

seasonal_deviation(field, where=None)

seasonal_deviation(field: str, *, where: _Expr | None = None) -> AggDescriptor

Lifetime z-score of the latest field against the hour-of-day baseline accumulated for the entity. Catches "this user is normally quiet at 3am, why are they spending now." Returns f64. Lifetime-only — no window=.

seasonal_deviation.py
e.group_by("user_id").agg(
    odd_hour_spend=bv.seasonal_deviation("amount"),
)

Buffer

Bounded ordered buffers — keep a small list of recent values per entity, in the order they arrived. Memory is fixed by the n= bound at register time.

first_n(field, n, where=None)

first_n(field: str, *, n: int, where: _Expr | None = None) -> AggDescriptor

The first n matching values of field in insertion order. Once the buffer is full, later events are ignored. Returns a JSON-string array.

first_n.py
e.group_by("user_id").agg(
    first_3_paths=bv.first_n("path", n=3),
)

last_n(field, n, where=None)

last_n(field: str, *, n: int, where: _Expr | None = None) -> AggDescriptor

The last n matching values of field, oldest-to-newest. Sliding ring buffer — the oldest entry is dropped when a new one arrives. Returns a JSON-string array.

last_n.py
e.group_by("user_id").agg(
    recent_paths=bv.last_n("path", n=10),
)

lag(field, n=1)

lag(field: str, *, n: int = 1) -> AggDescriptor

The value of field from n events ago. Output type inherits from the upstream column. n defaults to 1 (the immediately previous value).

lag.py
e.group_by("user_id").agg(
    prev_amount=bv.lag("amount"),
    amount_2_ago=bv.lag("amount", n=2),
)

delta_from_prev(field, where=None)

delta_from_prev(field: str, *, where: _Expr | None = None) -> AggDescriptor

Difference between the latest field value and the previous one. Output type inherits from the upstream column.

delta_from_prev.py
e.group_by("user_id").agg(
    spend_delta=bv.delta_from_prev("amount"),
)

Geo

Lat/lon-pair operators built on the haversine formula. Each takes lat= and lon= kwargs naming the latitude / longitude columns on the upstream event; both columns are read on every matching event.

geo_velocity(lat, lon, where=None)

geo_velocity(*, lat: str, lon: str, where: _Expr | None = None) -> AggDescriptor

Lifetime maximum km/h between consecutive matching events. Catches "this card was used in two cities five minutes apart." Returns f64.

geo_velocity.py
e.group_by("user_id").agg(
    impossible_speed=bv.geo_velocity(lat="lat", lon="lon"),
)

geo_distance(lat, lon, where=None)

geo_distance(*, lat: str, lon: str, where: _Expr | None = None) -> AggDescriptor

Lifetime cumulative haversine path length (km) traced by the entity's matching events. Returns f64.

geo_distance.py
e.group_by("user_id").agg(
    total_km=bv.geo_distance(lat="lat", lon="lon"),
)

geo_spread(lat, lon, where=None)

geo_spread(*, lat: str, lon: str, where: _Expr | None = None) -> AggDescriptor

Lifetime RMS dispersion (km) of matching events around the running centroid — "how spread out is this entity's footprint." Returns f64.

geo_spread.py
e.group_by("user_id").agg(
    geo_footprint_km=bv.geo_spread(lat="lat", lon="lon"),
)

distance_from_home(lat, lon, samples=100, where=None)

distance_from_home(*, lat: str, lon: str, samples: int = 100, where: _Expr | None = None) -> AggDescriptor

Distance (km) from the current event's coordinates to the centroid of the last samples matching events. Returns f64.

distance_from_home.py
e.group_by("user_id").agg(
    km_from_home=bv.distance_from_home(lat="lat", lon="lon", samples=50),
)

Where to go next

You've got the catalogue. The two adjacent pages cover the expression DSL the operators consume and the error codes the server returns: