Skip to content

feat(sync):daemon support for advanced sync #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion aw-sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,26 @@ Was originally prototyped as a PR to aw-server: https://github.com/ActivityWatch
This will start a daemon which pulls and pushes events with the sync directory (`~/ActivityWatchSync` by default) every 5 minutes:

```sh
# Basic sync daemon (syncs all buckets every 5 minutes)
aw-sync

# Same as above
aw-sync daemon

# Sync daemon with specific buckets only
aw-sync daemon --buckets "aw-watcher-window,aw-watcher-afk" --start-date "2024-01-01"

# Sync all buckets once and exit
aw-sync sync --start-date "2024-01-01"
```

For more options, see `aw-sync --help`.
For more options, see `aw-sync --help`. Some notable options:
- `--buckets`: Specify which buckets to sync (comma-separated). By default, all buckets are synced.
- Use `--buckets "bucket1,bucket2"` to sync specific buckets
- Not specifying this option syncs all buckets by default
- `--start-date`: Only sync events after this date (YYYY-MM-DD)
- `--sync-db`: Specify a specific database file in the sync directory
- `--mode`: Choose sync mode: "push", "pull", or "both" (default: "both")

### Setting up sync

Expand Down
207 changes: 135 additions & 72 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// - [x] Setup local sync bucket
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
// - [x] Import buckets and sync events from remotes
// - [ ] Add CLI arguments
// - [x] Add CLI arguments
// - [x] For which local server to use
// - [x] For which sync dir to use
// - [ ] Date to start syncing from
// - [x] Date to start syncing from

#[macro_use]
extern crate log;
Expand Down Expand Up @@ -60,35 +60,45 @@ struct Opts {
enum Commands {
/// Daemon subcommand
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
Daemon {},
Daemon {
/// Date to start syncing from.
/// If not specified, start from beginning.
/// Format: YYYY-MM-DD
#[clap(long, value_parser=parse_start_date)]
start_date: Option<DateTime<Utc>>,

/// Specify buckets to sync using a comma-separated list.
/// By default, all buckets are synced.
#[clap(long)]
buckets: Option<String>,

/// Full path to sync db file
/// Useful for syncing buckets from a specific db file in the sync directory.
/// Must be a valid absolute path to a file in the sync directory.
#[clap(long)]
sync_db: Option<PathBuf>,
},

/// Sync subcommand (basic)
/// Sync subcommand
///
/// Pulls remote buckets then pushes local buckets.
/// Syncs data between local aw-server and sync directory.
/// First pulls remote buckets from the sync directory to the local aw-server.
/// Then pushes local buckets from the aw-server to the local sync directory.
Sync {
/// Host(s) to pull from, comma separated. Will pull from all hosts if not specified.
#[clap(long, value_parser=parse_list)]
host: Option<Vec<String>>,
},

/// Sync subcommand (advanced)
///
/// Pulls remote buckets then pushes local buckets.
/// First pulls remote buckets in the sync directory to the local aw-server.
/// Then pushes local buckets from the aw-server to the local sync directory.
#[clap(arg_required_else_help = true)]
SyncAdvanced {
/// Date to start syncing from.
/// If not specified, start from beginning.
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
/// Format: YYYY-MM-DD
#[clap(long, value_parser=parse_start_date)]
start_date: Option<DateTime<Utc>>,

/// Specify buckets to sync using a comma-separated list.
/// If not specified, all buckets will be synced.
#[clap(long, value_parser=parse_list)]
buckets: Option<Vec<String>>,
/// By default, all buckets are synced.
#[clap(long)]
buckets: Option<String>,

/// Mode to sync in. Can be "push", "pull", or "both".
/// Defaults to "both".
Expand All @@ -111,6 +121,13 @@ fn parse_start_date(arg: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
}

fn parse_list(arg: &str) -> Result<Vec<String>, clap::Error> {
// If the argument is empty or just whitespace, return an empty Vec
// This handles the case when --buckets is used without a value
if arg.trim().is_empty() {
return Ok(vec![]);
}

// Otherwise, split by comma as usual
Ok(arg.split(',').map(|s| s.to_string()).collect())
}

Expand Down Expand Up @@ -139,60 +156,94 @@ fn main() -> Result<(), Box<dyn Error>> {

let client = AwClient::new(&opts.host, port, "aw-sync")?;

// if opts.command is None, then we're using the default subcommand (Sync)
match opts.command.unwrap_or(Commands::Daemon {}) {
// if opts.command is None, then we're using the default subcommand (Daemon)
match opts.command.unwrap_or(Commands::Daemon {
start_date: None,
buckets: None,
sync_db: None,
}) {
// Start daemon
Commands::Daemon {} => {
Commands::Daemon {
start_date,
buckets,
sync_db,
} => {
info!("Starting daemon...");
daemon(&client)?;
}
// Perform basic sync
Commands::Sync { host } => {
// Pull
match host {
Some(hosts) => {
for host in hosts.iter() {
info!("Pulling from host: {}", host);
sync_wrapper::pull(host, &client)?;
}
}
None => {
info!("Pulling from all hosts");
sync_wrapper::pull_all(&client)?;
}
}

// Push
info!("Pushing local data");
sync_wrapper::push(&client)?
// Use an empty vector to sync all buckets for these cases:
// 1. When --buckets '*' is supplied
// 2. When no bucket argument is provided (default)
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
Some(vec![])
} else if let Some(buckets_str) = buckets {
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
} else {
None
};

daemon(&client, start_date, effective_buckets, sync_db)?;
}
// Perform two-way sync
Commands::SyncAdvanced {
// Perform sync
Commands::Sync {
host,
start_date,
buckets,
mode,
sync_db,
} => {
let sync_dir = dirs::get_sync_dir()?;
if let Some(db_path) = &sync_db {
info!("Using sync db: {}", &db_path.display());
// Use an empty vector to sync all buckets for these cases:
// 1. When --buckets '*' is supplied
// 2. When no bucket argument is provided (default)
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
Some(vec![])
} else if let Some(buckets_str) = buckets {
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
} else {
None
};

if !db_path.is_absolute() {
Err("Sync db path must be absolute")?
}
if !db_path.starts_with(&sync_dir) {
Err("Sync db path must be in sync directory")?
// If advanced options are provided, use advanced sync mode
if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() {
let sync_dir = dirs::get_sync_dir()?;
if let Some(db_path) = &sync_db {
info!("Using sync db: {}", &db_path.display());

if !db_path.is_absolute() {
Err("Sync db path must be absolute")?
}
if !db_path.starts_with(&sync_dir) {
Err("Sync db path must be in sync directory")?
}
}
}

let sync_spec = sync::SyncSpec {
path: sync_dir,
path_db: sync_db,
buckets,
start: start_date,
};
let sync_spec = sync::SyncSpec {
path: sync_dir,
path_db: sync_db,
buckets: effective_buckets,
start: start_date,
};

sync::sync_run(&client, &sync_spec, mode)?
} else {
// Simple host-based sync mode (backwards compatibility)
// Pull
match host {
Some(hosts) => {
for host in hosts.iter() {
info!("Pulling from host: {}", host);
sync_wrapper::pull(host, &client)?;
}
}
None => {
info!("Pulling from all hosts");
sync_wrapper::pull_all(&client)?;
}
}

sync::sync_run(&client, &sync_spec, mode)?
// Push
info!("Pushing local data");
sync_wrapper::push(&client)?
}
}

// List all buckets
Expand All @@ -207,23 +258,45 @@ fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}

fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
fn daemon(
client: &AwClient,
start_date: Option<DateTime<Utc>>,
buckets: Option<Vec<String>>,
sync_db: Option<PathBuf>,
) -> Result<(), Box<dyn Error>> {
let (tx, rx) = channel();

ctrlc::set_handler(move || {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ctrlc handler setup code is duplicated in both daemon() and daemon_advanced(). Consider refactoring this common logic into a helper function to reduce duplication.

let _ = tx.send(());
})?;

let sync_dir = dirs::get_sync_dir()?;
if let Some(db_path) = &sync_db {
info!("Using sync db: {}", &db_path.display());

if !db_path.is_absolute() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync_db path validation logic is duplicated in both SyncAdvanced and daemon_advanced. Consider extracting this into a separate helper function to improve maintainability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth atp, never going to reuse/rewrite any other time.

Err("Sync db path must be absolute")?
}
if !db_path.starts_with(&sync_dir) {
Err("Sync db path must be in sync directory")?
}
}

let sync_spec = sync::SyncSpec {
path: sync_dir,
buckets,
path_db: sync_db,
start: start_date,
};

loop {
if let Err(e) = daemon_sync_cycle(client) {
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
error!("Error during sync cycle: {}", e);
// Re-throw the error
return Err(e);
}

info!("Sync pass done, sleeping for 5 minutes");

// Wait for either the sleep duration or a termination signal
match rx.recv_timeout(Duration::from_secs(300)) {
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
info!("Termination signal received, shutting down.");
Expand All @@ -237,13 +310,3 @@ fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {

Ok(())
}

fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;

Ok(())
}
10 changes: 8 additions & 2 deletions aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,18 @@ pub fn sync_datastores(
.get_buckets()
.unwrap()
.iter_mut()
// If buckets vec isn't empty, filter out buckets not in the buckets vec
// Only filter buckets if specific bucket IDs are provided
.filter(|tup| {
let bucket = &tup.1;
if let Some(buckets) = &sync_spec.buckets {
buckets.iter().any(|b_id| b_id == &bucket.id)
// If "*" is in the buckets list or no buckets specified, sync all buckets
if buckets.iter().any(|b_id| b_id == "*") || buckets.is_empty() {
true
} else {
buckets.iter().any(|b_id| b_id == &bucket.id)
}
} else {
// By default, sync all buckets
true
}
})
Expand Down
10 changes: 2 additions & 8 deletions aw-sync/src/sync_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
let sync_spec = SyncSpec {
path: sync_dir.clone(),
path_db: Some(db.path().clone()),
buckets: Some(vec![
format!("aw-watcher-window_{}", host),
format!("aw-watcher-afk_{}", host),
]),
buckets: None, // Sync all buckets by default
start: None,
};
sync_run(client, &sync_spec, SyncMode::Pull)?;
Expand All @@ -67,10 +64,7 @@ pub fn push(client: &AwClient) -> Result<(), Box<dyn Error>> {
let sync_spec = SyncSpec {
path: sync_dir,
path_db: None,
buckets: Some(vec![
format!("aw-watcher-window_{}", client.hostname),
format!("aw-watcher-afk_{}", client.hostname),
]),
buckets: None, // Sync all buckets by default
start: None,
};
sync_run(client, &sync_spec, SyncMode::Push)?;
Expand Down
Loading