This is the low-altitude path. In the browser and Node the
ArrayView folds the change stream into a materialized
array for you. The Rust rindle-replica crate
deliberately does not: it hands you the raw incremental
ChangeEvent stream and gets out of the way. You decide
what to do with the diffs — fold them into your own view, forward them over the
wire, persist them.
This example does the first two, then proves the payoff: a view maintained
from the diff stream alone equals a fresh SELECT, with no re-querying. That
equivalence — view-after-write == fresh-query — is the whole point of IVM.
The complete program ships in the repo:
cargo run -p rindle-replica --example downstream_view
Your own view, kept current from diffs
The consumer owns a BTreeMap keyed by primary key. Every delivered Update
folds into it — and builds the JSON “wire frame” you’d forward to a browser or
another service. Both come from the same diffs.
use std::collections::BTreeMap;
use serde_json::json;
use rindle::OwnedValue;
use rindle_replica::{ChangeEvent, Update};
const COLS: [&str; 4] = ["id", "title", "priority", "open"];
struct Consumer {
view: BTreeMap<i64, serde_json::Value>, // id -> the row, as you'd render it
}
impl Consumer {
fn on_update(&mut self, u: &Update) {
let changes = match u {
Update::Hydrated { changes, .. } => changes, // fires once: the cold-start snapshot
Update::Changed { changes, .. } => changes, // fires after each committed write
};
for ch in changes {
match ch {
// A row entered the result set.
ChangeEvent::Add(n) => { self.view.insert(pk(&n.row), row_to_json(&n.row)); }
// A row left it (deleted, or no longer matches the filter).
ChangeEvent::Remove(n) => { self.view.remove(&pk(&n.row)); }
// A row that stayed changed. The key can move, so drop old then insert new.
ChangeEvent::Edit { old, row } => {
self.view.remove(&pk(old));
self.view.insert(pk(row), row_to_json(row));
}
// Nested-relationship diffs — only a `sub_as(..)` query emits these.
ChangeEvent::Child { .. } => {}
}
}
}
}
Two things worth noting. The engine carries every number as f64 (its single
numeric type), so an integer primary key arrives as Float(1.0) — extract it
accordingly:
fn pk(row: &[OwnedValue]) -> i64 {
match &row[0] {
OwnedValue::Int(i) => *i,
OwnedValue::Float(f) => *f as i64,
other => panic!("non-numeric primary key: {other:?}"),
}
}
And the row is positional — cells in schema order. You own the schema you
render into, so you map the positions onto your own names (row_to_json zips
COLS with the cells; the full helper is in the example file).
Drive it and prove it
Open a file-backed replica (the write-then-abort model needs wal2), register
the table, materialize the live query, and wire the consumer to its stream.
use std::cell::RefCell;
use std::rc::Rc;
use rindle_replica::{Db, QueryId};
let dir = tempfile::tempdir()?;
let db = Db::open(dir.path().join("app.db"))?;
db.read(|c| c.execute_batch(
"CREATE TABLE issues (id INTEGER PRIMARY KEY, title TEXT NOT NULL,
priority INTEGER NOT NULL, open BOOLEAN NOT NULL);",
))?;
db.register_table("issues")?;
// The live query: the OPEN issues, maintained incrementally.
let open_issues = db.query(QueryId(1), rindle::table("issues").r#where("open", true).build())?;
let consumer = Rc::new(RefCell::new(Consumer { view: BTreeMap::new() }));
{
let c = consumer.clone();
open_issues.subscribe(move |u| c.borrow_mut().on_update(u));
}
Drive ordinary SQL through the one controlled writer; the consumer’s view
updates after each commit, and a rollback delivers nothing:
let mut w = db.write()?;
w.exec_batch(
"INSERT INTO issues VALUES (1, 'login button misaligned', 2, 1);
INSERT INTO issues VALUES (2, 'slow dashboard query', 1, 1);
INSERT INTO issues VALUES (3, 'typo in footer', 3, 0);", // closed → never enters
)?;
w.commit()?;
let mut w = db.write()?;
w.exec("UPDATE issues SET open = ? WHERE id = ?", &[OwnedValue::Bool(false), OwnedValue::Int(1)])?;
w.commit()?; // consumer sees a Remove for #1 — no rescan
Finally, the payoff — the delta-folded view must equal a fresh query:
let fresh: Vec<serde_json::Value> = db.read(|c| {
let mut stmt = c.prepare("SELECT id, title, priority, open FROM issues WHERE open = 1 ORDER BY id")?;
let rows = stmt.query_map([], |r| Ok(json!({
"id": r.get::<_, i64>(0)?, "title": r.get::<_, String>(1)?,
"priority": r.get::<_, i64>(2)?, "open": r.get::<_, bool>(3)?,
})))?.collect::<rusqlite::Result<Vec<_>>>()?;
Ok(rows)
})?;
let folded: Vec<serde_json::Value> = consumer.borrow().view.values().cloned().collect();
assert_eq!(folded, fresh, "the delta-folded view must equal a fresh query — the IVM guarantee");
The view we maintained from diffs alone is equal, row for row, to a fresh
SELECT — and we never rescanned SQLite to keep it that way.
Why this layer
You drop to Rust when you want the engine without the opinions: a file-backed,
durable replica, one controlled writer, and the raw delta stream to route or
persist however you like. The callback runs on the writer thread during commit,
so keep it cheap (forward to a channel) and don’t re-enter the Db from inside
it.
The sibling live_issues example prints the raw events — including the nested
Child shape a sub_as(..) query emits — without folding:
cargo run -p rindle-replica --example live_issues
Next
- Rust quickstart — the same engine, end to end, from scratch.
- Replicate a source & materialize views — the replica
handle, the writer, and every
ChangeEventvariant in depth. - The change model —
Add/Remove/Edit/Childand the replay-equivalence invariant.