Skip to content

Support tracking update hooks #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#![no_std]
#![feature(vec_into_raw_parts)]
#![allow(internal_features)]
#![feature(btree_set_entry)]
#![feature(core_intrinsics)]
#![feature(assert_matches)]
#![feature(strict_overflow_ops)]
#![feature(vec_into_raw_parts)]

extern crate alloc;

Expand Down Expand Up @@ -33,6 +34,7 @@ mod schema;
mod state;
mod sync;
mod sync_local;
mod update_hooks;
mod util;
mod uuid;
mod version;
Expand Down Expand Up @@ -79,6 +81,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> {
crate::kv::register(db)?;
crate::state::register(db, state.clone())?;
sync::register(db, state.clone())?;
update_hooks::register(db, state.clone())?;

crate::schema::register(db)?;
crate::operations_vtab::register(db, state.clone())?;
Expand Down
35 changes: 34 additions & 1 deletion crates/core/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use core::{
cell::RefCell,
ffi::{c_int, c_void},
sync::atomic::{AtomicBool, Ordering},
};

use alloc::sync::Arc;
use alloc::{
collections::btree_set::BTreeSet,
string::{String, ToString},
sync::Arc,
};
use sqlite::{Connection, ResultCode};
use sqlite_nostd::{self as sqlite, Context};

Expand All @@ -14,12 +19,16 @@ use sqlite_nostd::{self as sqlite, Context};
/// functions/vtabs that need access to it.
pub struct DatabaseState {
pub is_in_sync_local: AtomicBool,
pending_updates: RefCell<BTreeSet<String>>,
commited_updates: RefCell<BTreeSet<String>>,
}

impl DatabaseState {
pub fn new() -> Self {
DatabaseState {
is_in_sync_local: AtomicBool::new(false),
pending_updates: Default::default(),
commited_updates: Default::default(),
}
}

Expand All @@ -39,6 +48,30 @@ impl DatabaseState {
ClearOnDrop(self)
}

pub fn track_update(&self, tbl: &str) {
let mut set = self.pending_updates.borrow_mut();
set.get_or_insert_with(tbl, str::to_string);
}

pub fn track_rollback(&self) {
self.pending_updates.borrow_mut().clear();
}

pub fn track_commit(&self) {
let mut commited = self.commited_updates.borrow_mut();
let mut pending = self.pending_updates.borrow_mut();
let pending = core::mem::replace(&mut *pending, Default::default());

for pending in pending.into_iter() {
commited.insert(pending);
}
}

pub fn take_updates(&self) -> BTreeSet<String> {
let mut committed = self.commited_updates.borrow_mut();
core::mem::replace(&mut *committed, Default::default())
}

pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) {
drop(unsafe { Arc::from_raw(ptr.cast::<DatabaseState>()) });
}
Expand Down
171 changes: 171 additions & 0 deletions crates/core/src/update_hooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use core::{
ffi::{CStr, c_char, c_int, c_void},
ptr::null_mut,
sync::atomic::{AtomicBool, Ordering},
};

use alloc::{boxed::Box, sync::Arc};
use sqlite_nostd::{
self as sqlite, Connection, Context, ResultCode, Value, bindings::SQLITE_RESULT_SUBTYPE,
};

use crate::{constants::SUBTYPE_JSON, error::PowerSyncError, state::DatabaseState};

/// The `powersync_update_hooks` methods works like this:
///
/// 1. `powersync_update_hooks('install')` installs update hooks on the database, failing if
/// another hook already exists.
/// 2. `powersync_update_hooks('get')` returns a JSON array of table names that have been changed
/// and comitted since the last `powersync_update_hooks` call.
///
/// The update hooks don't have to be uninstalled manually, that happens when the connection is
/// closed and the function is unregistered.
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
let state = Box::new(HookState {
has_registered_hooks: AtomicBool::new(false),
db,
state,
});

db.create_function_v2(
"powersync_update_hooks",
1,
sqlite::UTF8 | sqlite::DETERMINISTIC | SQLITE_RESULT_SUBTYPE,
Some(Box::into_raw(state) as *mut c_void),
Some(powersync_update_hooks),
None,
None,
Some(destroy_function),
)?;
Ok(())
}

struct HookState {
has_registered_hooks: AtomicBool,
db: *mut sqlite::sqlite3,
state: Arc<DatabaseState>,
}

extern "C" fn destroy_function(ctx: *mut c_void) {
let state = unsafe { Box::from_raw(ctx as *mut HookState) };

if state.has_registered_hooks.load(Ordering::Relaxed) {
check_previous(
"update",
&state.state,
state.db.update_hook(None, null_mut()),
);
check_previous(
"commit",
&state.state,
state.db.commit_hook(None, null_mut()),
);
check_previous(
"rollback",
&state.state,
state.db.rollback_hook(None, null_mut()),
);
}
}

extern "C" fn powersync_update_hooks(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) {
let args = sqlite::args!(argc, argv);
let op = args[0].text();
let db = ctx.db_handle();
let user_data = ctx.user_data() as *const HookState;

match op {
"install" => {
let state = unsafe { user_data.as_ref().unwrap_unchecked() };
let db_state = &state.state;

check_previous(
"update",
db_state,
db.update_hook(
Some(update_hook_impl),
Arc::into_raw(db_state.clone()) as *mut c_void,
),
);
check_previous(
"commit",
db_state,
db.commit_hook(
Some(commit_hook_impl),
Arc::into_raw(db_state.clone()) as *mut c_void,
),
);
check_previous(
"rollback",
db_state,
db.rollback_hook(
Some(rollback_hook_impl),
Arc::into_raw(db_state.clone()) as *mut c_void,
),
);
state.has_registered_hooks.store(true, Ordering::Relaxed);
}
"get" => {
let state = unsafe { user_data.as_ref().unwrap_unchecked() };
let formatted = serde_json::to_string(&state.state.take_updates())
.map_err(PowerSyncError::internal);
match formatted {
Ok(result) => {
ctx.result_text_transient(&result);
ctx.result_subtype(SUBTYPE_JSON);
}
Err(e) => e.apply_to_ctx("powersync_update_hooks", ctx),
}
}
_ => {
ctx.result_error("Unknown operation");
ctx.result_error_code(ResultCode::MISUSE);
}
};
}

unsafe extern "C" fn update_hook_impl(
ctx: *mut c_void,
_kind: c_int,
_db: *const c_char,
table: *const c_char,
_rowid: i64,
) {
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
let table = unsafe { CStr::from_ptr(table) };
let Ok(table) = table.to_str() else {
return;
};

state.track_update(table);
}

unsafe extern "C" fn commit_hook_impl(ctx: *mut c_void) -> c_int {
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
state.track_commit();
return 0; // Allow commit to continue normally
}

unsafe extern "C" fn rollback_hook_impl(ctx: *mut c_void) {
let state = unsafe { (ctx as *const DatabaseState).as_ref().unwrap_unchecked() };
state.track_rollback();
}

fn check_previous(desc: &'static str, expected: &Arc<DatabaseState>, previous: *const c_void) {
let expected = Arc::as_ptr(expected);

assert!(
previous.is_null() || previous == expected.cast(),
"Previous call to {desc} hook outside of PowerSync: Expected {expected:p}, installed was {previous:p}",
);
if !previous.is_null() {
// The hook callbacks own an Arc<DatabaseState> that needs to be dropped now.
unsafe {
Arc::decrement_strong_count(previous);
}
}
}
63 changes: 63 additions & 0 deletions dart/test/update_hooks_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import 'dart:convert';

import 'package:sqlite3/common.dart';
import 'package:test/test.dart';

import 'utils/native_test_utils.dart';

void main() {
late CommonDatabase db;

setUp(() async {
db = openTestDatabase()
..select('select powersync_init()')
..execute('CREATE TABLE foo (bar INTEGER);')
..select("SELECT powersync_update_hooks('install')");
});

tearDown(() {
db.dispose();
});

List<String> collectUpdates() {
final [row] = db.select("SELECT powersync_update_hooks('get')");
return (json.decode(row.values[0] as String) as List).cast();
}

test('is empty initially', () {
expect(collectUpdates(), isEmpty);
});

test('reports changed tables', () {
db.execute('INSERT INTO foo DEFAULT VALUES');
expect(collectUpdates(), ['foo']);
});

test('deduplicates tables', () {
final stmt = db.prepare('INSERT INTO foo (bar) VALUES (?)');
for (var i = 0; i < 1000; i++) {
stmt.execute([i]);
}
stmt.dispose();

expect(collectUpdates(), ['foo']);
});

test('does not report changes before end of transaction', () {
db.execute('BEGIN');
db.execute('INSERT INTO foo DEFAULT VALUES');
expect(collectUpdates(), isEmpty);
db.execute('COMMIT');

expect(collectUpdates(), ['foo']);
});

test('does not report rollbacks', () {
db.execute('BEGIN');
db.execute('INSERT INTO foo DEFAULT VALUES');
expect(collectUpdates(), isEmpty);
db.execute('ROLLBACK');

expect(collectUpdates(), isEmpty);
});
}
2 changes: 1 addition & 1 deletion sqlite-rs-embedded
Loading