Use the engine directly

Replicate a source & materialize views

Open a file-backed replica, register tables, define live queries over a cut of your data, and run many replicas.

A replica is a SQLite database that Rindle watches. You write ordinary SQL through one controlled writer, and every live query registered against the replica receives the raw incremental change events its pipeline emits — the exact rows that entered, left, or changed, never a full re-fetch.

Rindle is deliberately unopinionated about views. It does not materialize a result set for you. It hands you the change stream and you decide what to do with it: forward the diffs to a client, fold them into your own in-memory view, or persist them. This page covers the replica handle, registering queries, the write path, and the shape of the events you get back.

Opening a replica

A replica is opened over a file-backed SQLite database. The write-then-abort derivation model needs WAL plus multiple connections, so an in-memory database cannot host it — Db::open(":memory:") fails loudly rather than silently degrading.

use rindle_replica::Db;

fn main() -> Result<(), rindle_replica::ReplicaError> {
    let db = Db::open("app.db")?;
    Ok(())
}

Db is a cheap-to-clone handle (it is an Rc internally). It is single-thread use in this release: a server runs it on a dedicated owner thread. Opening sets journal_mode = wal2, asserts the linked SQLite is threadsafe, and probes that it understands BEGIN CONCURRENT — each failure maps to a distinct error variant.

Db::read exposes a read-only connection for ad-hoc SELECTs and one-off DDL:

db.read(|conn| {
    conn.execute_batch(
        "CREATE TABLE issues (id INTEGER PRIMARY KEY, title TEXT, open BOOLEAN)",
    )
})?;

Registering tables

Before a table can appear in a live query, register it. Rindle discovers the table’s columns and primary key from the schema, ensures the primary key has the UNIQUE index the source requires, and builds a shared source. The call is idempotent per table.

db.register_table("issues")?;
db.register_table("comments")?;

Two rules are enforced here, not silently worked around:

  • A primary key is required — the engine uses it for row identity. A table without one is a ReplicaError::Schema.
  • BLOB columns are rejected — the engine’s value type has no blob variant, so a blob column is a ReplicaError::UnsupportedColumnType rather than a silently-nulled cell.

Registering a live query

A query is built from an Ast. The fluent builder in the rindle crate constructs one; Db::query takes a caller-chosen QueryId tag plus the AST, lowers it into the shared engine, hydrates it, and returns a Query handle. The QueryId is your own opaque identifier for the query (rindle_replica::QueryId(pub u64)).

// All OPEN issues. `QueryId(..)` is your own opaque tag for this query.
let open_issues = db.query(
    QueryId(1),
    rindle::table("issues").r#where("open", true).build(),
)?;

Nested queries correlate a child table to its parent. Inside a sub_as closure, r.col("id") references the parent row’s column — the engine derives the join correlation from the closure itself:

// Every issue, each carrying its comments.
let with_comments = db.query(
    QueryId(2),
    rindle::table("issues")
        .sub_as("comments", |r| {
            rindle::table("comments").r#where("issue_id", r.col("id"))
        })
        .build(),
)?;

If you already have a serialized AST, Db::query_json parses it and registers in one step:

let q = db.query_json(QueryId(3), r#"{ "table": "issues" }"#)?;

A query that names an unregistered table (or an unsupported shape) fails synchronously at registration with a ReplicaError::Build.

Subscribing to changes

Subscribe to a Query to receive its change stream. The callback fires once immediately with the hydrated initial state, then again after every committed write that affects the query.

use rindle_replica::{ChangeEvent, NodeData, Update};

open_issues.subscribe(|u: &Update| match u {
    Update::Hydrated { tx_id, changes } => {
        println!("hydrated @tx{} ({} events)", tx_id.0, changes.len());
        print_events(changes);
    }
    Update::Changed { tx_id, changes } => {
        println!("changed @tx{} ({} events)", tx_id.0, changes.len());
        print_events(changes);
    }
});

Update has exactly two variants, each a struct carrying a TxId and a Vec<ChangeEvent>:

  • Update::Hydrated { tx_id, changes } — fires once on subscribe. The initial state, delivered as all Adds (an empty changes vec for an empty result).
  • Update::Changed { tx_id, changes } — fires after each committed write, carrying that transaction’s incremental events.

TxId(pub u64) is a global, monotonic transaction id durable with the data; read its inner value with .0. The callback runs on the writer thread during commit, so keep it cheap (forward to a channel) and do not re-enter the Db from inside it.

Note — dropping a Query handle is a deliberate no-op; the query keeps running. Call Query::destroy() to tear it down and reclaim its slots. Query de-duplication and reference-counted teardown belong above this raw replica layer, in the planned persistent rindle-server materialization manager.

Change events

ChangeEvent is the raw delta the pipeline emits — a re-export of the engine’s rindle::CaughtChange. It has four variants:

fn print_events(changes: &[ChangeEvent]) {
    for ch in changes {
        match ch {
            // A row entered the result; `n` is a NodeData (the row + its
            // relationship subtrees).
            ChangeEvent::Add(n) => println!("+ {:?}", n.row),
            // A row left the result.
            ChangeEvent::Remove(n) => println!("- {:?}", n.row),
            // A row's cells changed; carries the two rows, not a node.
            ChangeEvent::Edit { old, row } => println!("~ {old:?} -> {row:?}"),
            // A nested (relationship) change: the parent row plus the inner change.
            ChangeEvent::Child { row, change, .. } => {
                println!("* child of {row:?}: {change:?}");
            }
        }
    }
}

The Add and Remove variants carry a NodeData (a re-export of rindle::CaughtNode). A node is a row plus its relationship subtrees:

  • NodeData::row — the owned row cells.
  • NodeData::relationships — child nodes keyed by relationship slot. Iterate n.relationships.values() to walk a nested query’s children:
ChangeEvent::Add(n) => {
    println!("+ {:?}", n.row);
    for kids in n.relationships.values() {
        for kid in kids {
            println!("    child {:?}", kid.row);
        }
    }
}

Note that Edit carries old and row (the before/after row cells) directly — there is no node on an edit. Child carries the parent row, the relationship slot, and a boxed inner change.

The write path

Every mutation flows through one controlled writer, opened with Db::write. Run ordinary SQL on the returned WriteTxn, then commit (which derives and delivers each affected query’s events) or rollback (which leaves every view untouched).

use rindle::OwnedValue;

let mut w = db.write()?;
w.exec(
    "INSERT INTO issues VALUES (?, ?, ?)",
    &[
        OwnedValue::Int(1),
        OwnedValue::str("first"),
        OwnedValue::Bool(true),
    ],
)?;
w.exec(
    "UPDATE issues SET open = ? WHERE id = ?",
    &[OwnedValue::Bool(false), OwnedValue::Int(2)],
)?;
let tx_id = w.commit()?;
println!("committed @tx{}", tx_id.0);

WriteTxn methods:

  • WriteTxn::exec(&mut self, sql, params) — run one statement with positional &[OwnedValue] params; returns the rows changed.
  • WriteTxn::exec_batch(&mut self, sql) — run a batch of statements (no params).
  • WriteTxn::commit(self) — consume the transaction, derive the incremental events, COMMIT the durable data, and deliver Update::Changed. Returns the new TxId.
  • WriteTxn::rollback(self) — consume the transaction and roll back; no events are delivered and no view is touched. Dropping a WriteTxn without committing rolls back too.

There is exactly one writer: calling Db::write while a transaction is already open is an error. Bind parameters use rindle::OwnedValue — its variants are Int(i64), Float(f64), Bool(bool), Null, and the constructor OwnedValue::str(&str) (plus a Json variant for raw JSON text).

You can read the last durably-committed transaction id at any time:

let tx = db.committed_tx_id(); // TxId(0) before anything is committed

Replay equivalence

The events delivered across a sequence of commits, folded in order, equal a freshly-built and hydrated query over the same committed database. This is the core guarantee: a subscriber that folds the stream always converges on exactly what a fresh query would return. The write-then-abort model — deriving each query’s change against the pre-commit snapshot before the durable COMMIT — is what makes the delivered events line up with a fresh hydrate.

Errors

Every fallible call returns Result<_, rindle_replica::ReplicaError>. The variants worth handling explicitly:

Variant When
ReplicaError::Open open/config failed (e.g. journal_mode did not become wal2, typically a non-file path)
ReplicaError::NotThreadsafe the linked SQLite was built single-thread-unsafe
ReplicaError::NoBeginConcurrent the linked SQLite lacks BEGIN CONCURRENT
ReplicaError::Schema schema discovery failed (missing table, or no primary key)
ReplicaError::UnsupportedColumnType { table, column } a column type the engine cannot represent (e.g. BLOB)
ReplicaError::Build the query could not be lowered (unknown table/column, unsupported shape)
ReplicaError::Rindle an engine runtime error (push/hydrate/storage/consistency), surfaced verbatim

Where to go next