use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::time::Duration;
use auth_git2::GitAuthenticator;
use git_autosave::{Config, authenticate::Inquirer, commit_autosave, push_autosaves};
use git2::{RemoteCallbacks, Repository, StatusOptions};
use happylock::{Mutex, ThreadKey};
use notify::{EventKind, INotifyWatcher, RecursiveMode};
use notify_debouncer_full::{DebounceEventHandler, DebounceEventResult, Debouncer, FileIdCache};
struct ConfigWatcher<Cache: FileIdCache + 'static> {
config: &'static Mutex<Config>,
repo_watcher: &'static mut Debouncer<INotifyWatcher, Cache>,
}
struct RepoWatcher {
config: &'static Mutex<Config>,
push_queue: Sender<PathBuf>,
}
fn push_queue(
consumer: Receiver<PathBuf>,
config: &'static Mutex<Config>,
mut key: ThreadKey,
) -> ! {
let mut queue = HashSet::new();
loop {
while ping::new("8.8.8.8".parse().unwrap())
.socket_type(ping::SocketType::DGRAM)
.timeout(Duration::from_secs(60))
.ttl(128)
.send()
.is_err()
{
let mut added_item = false;
while let Ok(workdir) = consumer.try_recv() {
added_item = true;
queue.insert(workdir);
}
if added_item {
log::warn!(
"There is no Internet connection, so pushes to the repository have been queued"
);
}
std::thread::yield_now();
}
while let Ok(workdir) = consumer.try_recv() {
queue.insert(workdir);
}
config.scoped_lock(&mut key, |config| {
for workdir in &queue {
log::info!("Pushing {}...", workdir.to_string_lossy());
let Ok(repository) = Repository::open(workdir) else {
log::error!("Failed to open repository: {}", workdir.to_string_lossy());
continue;
};
let Ok(gitconfig) = repository.config() else {
log::error!("Failed to load gitconfig for repository: {:?}", workdir);
return;
};
let auth = GitAuthenticator::new().set_prompter(Inquirer(&*config));
let mut callbacks = RemoteCallbacks::new();
callbacks.credentials(auth.credentials(&gitconfig));
if let Err(e) = push_autosaves(&repository, callbacks) {
log::error!("Failed to push autosaves: {e}");
}
log::info!("Successfully pushed {}", workdir.to_string_lossy());
}
});
queue.clear();
}
}
impl<Cache: FileIdCache + Send + 'static> DebounceEventHandler for ConfigWatcher<Cache> {
fn handle_event(&mut self, events: DebounceEventResult) {
let events = match events {
Ok(events) => events,
Err(errors) => {
for error in errors {
log::error!("Failed to load event: {error}");
}
return;
}
};
if events
.iter()
.all(|event| matches!(event.kind, EventKind::Access(_)))
{
return;
}
log::info!("The config was updated. Reloading...");
let Some(key) = ThreadKey::get() else {
log::error!("Failed to acquire thread key when reloading config. This is a bug!");
return;
};
let config = match Config::load() {
Ok(config) => config,
Err(e) => {
log::error!("Failed to reload autosave config: {e}");
return;
}
};
self.config.scoped_lock(key, |old_config| {
let paths_to_unwatch = old_config.repositories().difference(config.repositories());
let paths_to_watch = config.repositories().difference(old_config.repositories());
for path in paths_to_unwatch {
if let Err(e) = self.repo_watcher.unwatch(path) {
log::error!("Error when removing path from being watched: {e}");
}
log::info!("Removed {path:?} from list of repos to watch");
}
for path in paths_to_watch {
if let Err(e) = self.repo_watcher.watch(path, RecursiveMode::Recursive) {
log::error!("Error when adding path to repositories to watch: {e}");
}
log::info!("Added {path:?} from list of repos to watch");
}
*old_config = config;
});
log::info!("Successfully reloaded autosave config");
}
}
impl DebounceEventHandler for RepoWatcher {
fn handle_event(&mut self, events: DebounceEventResult) {
let Some(mut key) = ThreadKey::get() else {
log::error!("Failed to acquire thread key when autosaving repository. This is a bug!");
return;
};
let events = match events {
Ok(events) => events,
Err(errors) => {
for error in errors {
log::error!("Failed to get update events: {error}");
}
return;
}
};
let mut workdirs_to_autosave = HashSet::new();
let mut repositories_to_autosave = Vec::new();
for (event, path) in events
.iter()
.filter(|event| !matches!(event.kind, EventKind::Access(_)))
.flat_map(|event| event.paths.iter().map(move |path| (event, path)))
{
if path
.components()
.any(|component| component.as_os_str() == ".git")
{
// Prevent infinite loop from commits triggering autosaves
continue;
}
let Ok(repository) = Repository::discover(path) else {
log::warn!("Skipping non-repository: {:?}", &path);
continue;
};
let Some(workdir) = repository.workdir() else {
log::warn!("Skipping bare repository: {:?}", &path);
continue;
};
if workdirs_to_autosave.contains(workdir) {
continue;
}
match repository.is_path_ignored(path) {
Ok(true) => {
log::trace!("Skipping event for ignored path: {:?}", path);
continue;
}
Ok(false) => {}
Err(e) => {
log::error!("Failed to determine if path is ignore: {e}");
}
}
if let Ok(status) = repository.statuses(Some(
StatusOptions::new()
.include_untracked(true)
.include_ignored(false),
)) && status.is_empty()
{
continue;
}
log::info!("Event: {:?}", event);
log::info!("Updated path: {:?}", &path);
workdirs_to_autosave.insert(workdir.to_path_buf());
repositories_to_autosave.push(repository);
}
self.config.scoped_lock(&mut key, |config| {
for repository in repositories_to_autosave {
let workdir = repository
.workdir()
.map(|path| path.to_string_lossy())
.unwrap_or_default();
log::info!("Autosaving {:?}...", workdir);
let Ok(gitconfig) = repository.config() else {
log::error!("Failed to load gitconfig for repository: {:?}", workdir);
return;
};
let auth = GitAuthenticator::new().set_prompter(Inquirer(&*config));
let mut callbacks = RemoteCallbacks::new();
callbacks.credentials(auth.credentials(&gitconfig));
if let Err(e) = commit_autosave(&repository) {
log::error!("Failed to commit autosave: {e}");
}
if let Err(e) = self
.push_queue
.send(repository.workdir().unwrap().to_path_buf())
{
log::error!("Failed to add repository to push queue: {e}");
}
log::info!("Successfully autosaved {:?}", workdir);
}
});
}
}
fn main() -> Result<(), anyhow::Error> {
let mut key = ThreadKey::get().expect("Could not get ThreadKey on startup. This is a bug!");
colog::init();
log::info!("Loading autosave config...");
let config: &'static Mutex<Config> = Box::leak(Box::new(Mutex::new(Config::load()?)));
log::info!("Loaded autosave config");
log::info!("Starting push queue...");
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let key = ThreadKey::get().unwrap();
push_queue(receiver, config, key);
});
config.scoped_lock(&mut key, |config| {
for repository in config.repositories() {
if let Err(e) = sender.send(repository.clone()) {
log::error!("Failed to queue {}: {e}", repository.to_string_lossy());
}
}
});
log::info!("Started push queue");
log::info!("Starting repository watcher...");
let repo_watcher = Box::leak(Box::new(notify_debouncer_full::new_debouncer(
Duration::from_secs(5),
None,
RepoWatcher {
config,
push_queue: sender,
},
)?));
config.scoped_lock(key, |config| {
log::info!("Adding repositories to watch...");
for repository in config.repositories() {
if let Err(e) = repo_watcher.watch(repository, RecursiveMode::Recursive) {
log::error!("Failed to watch {repository:?}: {e}");
}
log::info!("Added {repository:?}");
}
});
log::info!("Started repository watcher");
log::info!("Starting configuration watcher...");
notify_debouncer_full::new_debouncer(
Duration::from_secs(5),
None,
ConfigWatcher {
config,
repo_watcher,
},
)?
.watch(
&confy::get_configuration_file_path("git-autosave", "git-autosaved")?,
RecursiveMode::NonRecursive,
)?;
log::info!("Started configuration watcher");
log::info!("Initializing complete. Parking...");
loop {
std::thread::yield_now();
}
}
|