port change tracking
This commit is contained in:
parent
7d42da67c6
commit
96f0a5cc3c
@ -24,3 +24,6 @@ rusqlite = "0.21.0"
|
||||
[build-dependencies]
|
||||
prost-build = "0.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
utime = "0.2.1"
|
||||
|
||||
|
@ -2,9 +2,12 @@
|
||||
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
use crate::err::Result;
|
||||
use crate::media::MediaManager;
|
||||
use rusqlite::{params, Connection, OptionalExtension, Statement, NO_PARAMS};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
fn open_or_create(path: &str) -> Result<Connection> {
|
||||
pub(super) fn open_or_create<P: AsRef<Path>>(path: P) -> Result<Connection> {
|
||||
let mut db = Connection::open(path)?;
|
||||
|
||||
db.pragma_update(None, "locking_mode", &"exclusive")?;
|
||||
@ -41,7 +44,7 @@ pub struct MediaEntry {
|
||||
// Modification time; 0 if deleted
|
||||
pub mtime: i64,
|
||||
/// True if changed since last sync
|
||||
pub dirty: bool,
|
||||
pub sync_required: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
@ -77,11 +80,11 @@ impl MediaDatabaseSession<'_> {
|
||||
self.db.execute_batch("commit").map_err(Into::into)
|
||||
}
|
||||
|
||||
fn rollback(&mut self) -> Result<()> {
|
||||
pub(super) fn rollback(&mut self) -> Result<()> {
|
||||
self.db.execute_batch("rollback").map_err(Into::into)
|
||||
}
|
||||
|
||||
fn get_entry(&mut self, fname: &str) -> Result<Option<MediaEntry>> {
|
||||
pub(super) fn get_entry(&mut self, fname: &str) -> Result<Option<MediaEntry>> {
|
||||
let stmt = cached_sql!(
|
||||
self.get_entry_stmt,
|
||||
self.db,
|
||||
@ -106,14 +109,14 @@ select fname, csum, mtime, dirty from media where fname=?"
|
||||
fname: row.get(0)?,
|
||||
sha1: sha1_array,
|
||||
mtime: row.get(2)?,
|
||||
dirty: row.get(3)?,
|
||||
sync_required: row.get(3)?,
|
||||
})
|
||||
})
|
||||
.optional()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> {
|
||||
pub(super) fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> {
|
||||
let stmt = cached_sql!(
|
||||
self.update_entry_stmt,
|
||||
self.db,
|
||||
@ -123,12 +126,17 @@ values (?, ?, ?, ?)"
|
||||
);
|
||||
|
||||
let sha1_str = entry.sha1.map(hex::encode);
|
||||
stmt.execute(params![entry.fname, sha1_str, entry.mtime, entry.dirty])?;
|
||||
stmt.execute(params![
|
||||
entry.fname,
|
||||
sha1_str,
|
||||
entry.mtime,
|
||||
entry.sync_required
|
||||
])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_entry(&mut self, fname: &str) -> Result<()> {
|
||||
pub(super) fn remove_entry(&mut self, fname: &str) -> Result<()> {
|
||||
let stmt = cached_sql!(
|
||||
self.remove_entry_stmt,
|
||||
self.db,
|
||||
@ -141,7 +149,7 @@ delete from media where fname=?"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_meta(&mut self) -> Result<MediaDatabaseMetadata> {
|
||||
pub(super) fn get_meta(&mut self) -> Result<MediaDatabaseMetadata> {
|
||||
let mut stmt = self.db.prepare("select dirMod, lastUsn from meta")?;
|
||||
|
||||
stmt.query_row(NO_PARAMS, |row| {
|
||||
@ -153,20 +161,20 @@ delete from media where fname=?"
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> {
|
||||
pub(super) fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> {
|
||||
let mut stmt = self.db.prepare("update meta set dirMod = ?, lastUsn = ?")?;
|
||||
stmt.execute(params![meta.folder_mtime, meta.last_sync_usn])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clear(&mut self) -> Result<()> {
|
||||
pub(super) fn clear(&mut self) -> Result<()> {
|
||||
self.db
|
||||
.execute_batch("delete from media; update meta set lastUsn = 0, dirMod = 0")
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn changes_pending(&mut self) -> Result<u32> {
|
||||
pub(super) fn changes_pending(&mut self) -> Result<u32> {
|
||||
self.db
|
||||
.query_row(
|
||||
"select count(*) from media where dirty=1",
|
||||
@ -176,7 +184,7 @@ delete from media where fname=?"
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn count(&mut self) -> Result<u32> {
|
||||
pub(super) fn count(&mut self) -> Result<u32> {
|
||||
self.db
|
||||
.query_row(
|
||||
"select count(*) from media where csum is not null",
|
||||
@ -186,7 +194,7 @@ delete from media where fname=?"
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn get_pending_uploads(&mut self, max_entries: u32) -> Result<Vec<MediaEntry>> {
|
||||
pub(super) fn get_pending_uploads(&mut self, max_entries: u32) -> Result<Vec<MediaEntry>> {
|
||||
let mut stmt = self
|
||||
.db
|
||||
.prepare("select fname from media where dirty=1 limit ?")?;
|
||||
@ -199,18 +207,17 @@ delete from media where fname=?"
|
||||
|
||||
results
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MediaDatabase {
|
||||
db: Connection,
|
||||
}
|
||||
|
||||
impl MediaDatabase {
|
||||
pub fn new(path: &str) -> Result<Self> {
|
||||
let db = open_or_create(path)?;
|
||||
Ok(MediaDatabase { db })
|
||||
pub(super) fn all_mtimes(&mut self) -> Result<HashMap<String, i64>> {
|
||||
let mut stmt = self.db.prepare("select fname, mtime from media")?;
|
||||
let map: std::result::Result<HashMap<String, i64>, rusqlite::Error> = stmt
|
||||
.query_map(NO_PARAMS, |row| Ok((row.get(0)?, row.get(1)?)))?
|
||||
.collect();
|
||||
Ok(map?)
|
||||
}
|
||||
}
|
||||
|
||||
impl MediaManager {
|
||||
pub fn get_entry(&mut self, fname: &str) -> Result<Option<MediaEntry>> {
|
||||
self.query(|ctx| ctx.get_entry(fname))
|
||||
}
|
||||
@ -252,7 +259,7 @@ impl MediaDatabase {
|
||||
///
|
||||
/// This function should be used for read-only requests. To mutate
|
||||
/// the database, use transact() instead.
|
||||
fn query<F, R>(&self, func: F) -> Result<R>
|
||||
pub(super) fn query<F, R>(&self, func: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(&mut MediaDatabaseSession) -> Result<R>,
|
||||
{
|
||||
@ -268,7 +275,7 @@ impl MediaDatabase {
|
||||
|
||||
/// Execute the provided closure in a transaction, rolling back if
|
||||
/// an error is returned.
|
||||
fn transact<F, R>(&self, func: F) -> Result<R>
|
||||
pub(super) fn transact<F, R>(&self, func: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(&mut MediaDatabaseSession) -> Result<R>,
|
||||
{
|
||||
@ -295,15 +302,16 @@ impl MediaDatabase {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::err::Result;
|
||||
use crate::media::database::{MediaDatabase, MediaEntry};
|
||||
use crate::media::database::MediaEntry;
|
||||
use crate::media::files::sha1_of_data;
|
||||
use crate::media::MediaManager;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
#[test]
|
||||
fn test_database() -> Result<()> {
|
||||
let db_file = NamedTempFile::new()?;
|
||||
let db_file_path = db_file.path().to_str().unwrap();
|
||||
let mut db = MediaDatabase::new(db_file_path)?;
|
||||
let mut db = MediaManager::new("/dummy", db_file_path)?;
|
||||
|
||||
// no entry exists yet
|
||||
assert_eq!(db.get_entry("test.mp3")?, None);
|
||||
@ -313,7 +321,7 @@ mod test {
|
||||
fname: "test.mp3".into(),
|
||||
sha1: None,
|
||||
mtime: 0,
|
||||
dirty: false,
|
||||
sync_required: false,
|
||||
};
|
||||
db.set_entry(&entry)?;
|
||||
assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry);
|
||||
@ -321,7 +329,7 @@ mod test {
|
||||
// update it
|
||||
entry.sha1 = Some(sha1_of_data("hello".as_bytes()));
|
||||
entry.mtime = 123;
|
||||
entry.dirty = true;
|
||||
entry.sync_required = true;
|
||||
db.set_entry(&entry)?;
|
||||
assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry);
|
||||
|
||||
@ -342,7 +350,7 @@ mod test {
|
||||
|
||||
// reopen database, and ensure data was committed
|
||||
drop(db);
|
||||
db = MediaDatabase::new(db_file_path)?;
|
||||
db = MediaManager::new("/dummy", db_file_path)?;
|
||||
meta = db.get_meta()?;
|
||||
assert_eq!(meta.folder_mtime, 123);
|
||||
|
||||
|
@ -1,13 +1,17 @@
|
||||
// Copyright: Ankitects Pty Ltd and contributors
|
||||
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
use crate::err::Result;
|
||||
use crate::media::database::MediaEntry;
|
||||
use crate::media::MediaManager;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use sha1::Sha1;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::{fs, io};
|
||||
use std::{fs, io, time};
|
||||
use unicode_normalization::{is_nfc_quick, IsNormalized, UnicodeNormalization};
|
||||
|
||||
/// The maximum length we allow a filename to be. When combined
|
||||
@ -16,6 +20,9 @@ use unicode_normalization::{is_nfc_quick, IsNormalized, UnicodeNormalization};
|
||||
/// the length of the filename.
|
||||
static MAX_FILENAME_LENGTH: usize = 120;
|
||||
|
||||
/// Media syncing does not support files over 100MiB.
|
||||
static MEDIA_SYNC_FILESIZE_LIMIT: usize = 100 * 1024 * 1024;
|
||||
|
||||
lazy_static! {
|
||||
static ref WINDOWS_DEVICE_NAME: Regex = Regex::new(
|
||||
r#"(?xi)
|
||||
@ -31,6 +38,16 @@ lazy_static! {
|
||||
"#
|
||||
)
|
||||
.unwrap();
|
||||
static ref NONSYNCABLE_FILENAME: Regex = Regex::new(
|
||||
r#"(?xi)
|
||||
^
|
||||
(:?
|
||||
thumbs.db | .ds_store
|
||||
)
|
||||
$
|
||||
"#
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// True if character may cause problems on one or more platforms.
|
||||
@ -209,14 +226,187 @@ pub(crate) fn sha1_of_data(data: &[u8]) -> [u8; 20] {
|
||||
hasher.digest().bytes()
|
||||
}
|
||||
|
||||
struct FilesystemEntry {
|
||||
fname: String,
|
||||
sha1: Option<[u8; 20]>,
|
||||
mtime: i64,
|
||||
is_new: bool,
|
||||
}
|
||||
|
||||
impl MediaManager {
|
||||
/// Note any added/changed/deleted files.
|
||||
///
|
||||
/// In the future, we could register files in the media DB as they
|
||||
/// are added, meaning that for users who don't modify files externally, the
|
||||
/// folder scan could be skipped.
|
||||
pub fn register_changes(&mut self) -> Result<()> {
|
||||
// folder mtime unchanged?
|
||||
let media_dir_modified = self
|
||||
.media_folder
|
||||
.metadata()?
|
||||
.modified()?
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64;
|
||||
let mut meta = self.get_meta()?;
|
||||
if media_dir_modified == meta.folder_mtime {
|
||||
return Ok(());
|
||||
} else {
|
||||
meta.folder_mtime = media_dir_modified;
|
||||
}
|
||||
|
||||
let mtimes = self.query(|ctx| ctx.all_mtimes())?;
|
||||
|
||||
let (changed, removed) = self.media_folder_changes(mtimes)?;
|
||||
|
||||
self.add_updated_entries(changed)?;
|
||||
self.remove_deleted_files(removed)?;
|
||||
|
||||
self.set_meta(&meta)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Scan through the media folder, finding changes.
|
||||
/// Returns (added/changed files, removed files).
|
||||
///
|
||||
/// Checks for invalid filenames and unicode normalization are deferred
|
||||
/// until syncing time, as we can't trust the entries previous Anki versions
|
||||
/// wrote are correct.
|
||||
fn media_folder_changes(
|
||||
&self,
|
||||
mut mtimes: HashMap<String, i64>,
|
||||
) -> Result<(Vec<FilesystemEntry>, Vec<String>)> {
|
||||
let mut added_or_changed = vec![];
|
||||
|
||||
// loop through on-disk files
|
||||
for dentry in self.media_folder.read_dir()? {
|
||||
let dentry = dentry?;
|
||||
|
||||
// skip folders
|
||||
if dentry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if the filename is not valid unicode, skip it
|
||||
let fname_os = dentry.file_name();
|
||||
let fname = match fname_os.to_str() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// ignore blacklisted files
|
||||
if NONSYNCABLE_FILENAME.is_match(fname) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// ignore large files
|
||||
let metadata = dentry.metadata()?;
|
||||
if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// remove from mtimes for later deletion tracking
|
||||
let previous_mtime = mtimes.remove(fname);
|
||||
|
||||
// skip files that have not been modified
|
||||
let mtime = metadata
|
||||
.modified()?
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64;
|
||||
if let Some(previous_mtime) = previous_mtime {
|
||||
if previous_mtime == mtime {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// add entry to the list
|
||||
let sha1 = Some(sha1_of_file(&dentry.path())?);
|
||||
added_or_changed.push(FilesystemEntry {
|
||||
fname: fname.to_string(),
|
||||
sha1,
|
||||
mtime,
|
||||
is_new: previous_mtime.is_none(),
|
||||
});
|
||||
}
|
||||
|
||||
// any remaining entries from the database have been deleted
|
||||
let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect();
|
||||
|
||||
Ok((added_or_changed, removed))
|
||||
}
|
||||
|
||||
/// Add added/updated entries to the media DB.
|
||||
///
|
||||
/// Skip files where the mod time differed, but checksums are the same.
|
||||
fn add_updated_entries(&mut self, entries: Vec<FilesystemEntry>) -> Result<()> {
|
||||
for chunk in entries.chunks(1_024) {
|
||||
self.transact(|ctx| {
|
||||
for fentry in chunk {
|
||||
let mut sync_required = true;
|
||||
if !fentry.is_new {
|
||||
if let Some(db_entry) = ctx.get_entry(&fentry.fname)? {
|
||||
if db_entry.sha1 == fentry.sha1 {
|
||||
// mtime bumped but file contents are the same,
|
||||
// so we can preserve the current updated flag.
|
||||
// we still need to update the mtime however.
|
||||
sync_required = db_entry.sync_required
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ctx.set_entry(&MediaEntry {
|
||||
fname: fentry.fname.clone(),
|
||||
sha1: fentry.sha1,
|
||||
mtime: fentry.mtime,
|
||||
sync_required,
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove deleted files from the media DB.
|
||||
fn remove_deleted_files(&mut self, removed: Vec<String>) -> Result<()> {
|
||||
for chunk in removed.chunks(4_096) {
|
||||
self.transact(|ctx| {
|
||||
for fname in chunk {
|
||||
ctx.set_entry(&MediaEntry {
|
||||
fname: fname.clone(),
|
||||
sha1: None,
|
||||
mtime: 0,
|
||||
sync_required: true,
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::err::Result;
|
||||
use crate::media::database::MediaEntry;
|
||||
use crate::media::files::{
|
||||
add_data_to_folder_uniquely, add_hash_suffix_to_file_stem, normalize_filename,
|
||||
sha1_of_data, MAX_FILENAME_LENGTH,
|
||||
};
|
||||
use crate::media::MediaManager;
|
||||
use std::borrow::Cow;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::{fs, time};
|
||||
use tempfile::tempdir;
|
||||
use utime;
|
||||
|
||||
#[test]
|
||||
fn test_normalize() {
|
||||
@ -278,4 +468,105 @@ mod test {
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
// helper
|
||||
fn change_mtime(p: &Path) {
|
||||
let mtime = p.metadata().unwrap().modified().unwrap();
|
||||
let new_mtime = mtime - Duration::from_secs(3);
|
||||
let secs = new_mtime
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
utime::set_file_times(p, secs, secs).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_change_tracking() -> Result<()> {
|
||||
let dir = tempdir()?;
|
||||
let media_dir = dir.path().join("media");
|
||||
std::fs::create_dir(&media_dir)?;
|
||||
let media_db = dir.path().join("media.db");
|
||||
|
||||
let mut mgr = MediaManager::new(&media_dir, media_db)?;
|
||||
assert_eq!(mgr.count()?, 0);
|
||||
|
||||
// add a file and check it's picked up
|
||||
let f1 = media_dir.join("file.jpg");
|
||||
fs::write(&f1, "hello")?;
|
||||
|
||||
change_mtime(&media_dir);
|
||||
mgr.register_changes()?;
|
||||
|
||||
assert_eq!(mgr.count()?, 1);
|
||||
assert_eq!(mgr.changes_pending()?, 1);
|
||||
let mut entry = mgr.get_entry("file.jpg")?.unwrap();
|
||||
assert_eq!(
|
||||
entry,
|
||||
MediaEntry {
|
||||
fname: "file.jpg".into(),
|
||||
sha1: Some(sha1_of_data("hello".as_bytes())),
|
||||
mtime: f1
|
||||
.metadata()?
|
||||
.modified()?
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64,
|
||||
sync_required: true,
|
||||
}
|
||||
);
|
||||
|
||||
// mark it as unmodified
|
||||
entry.sync_required = false;
|
||||
mgr.set_entry(&entry)?;
|
||||
assert_eq!(mgr.changes_pending()?, 0);
|
||||
|
||||
// modify it
|
||||
fs::write(&f1, "hello1")?;
|
||||
change_mtime(&f1);
|
||||
|
||||
change_mtime(&media_dir);
|
||||
mgr.register_changes()?;
|
||||
|
||||
assert_eq!(mgr.count()?, 1);
|
||||
assert_eq!(mgr.changes_pending()?, 1);
|
||||
assert_eq!(
|
||||
mgr.get_entry("file.jpg")?.unwrap(),
|
||||
MediaEntry {
|
||||
fname: "file.jpg".into(),
|
||||
sha1: Some(sha1_of_data("hello1".as_bytes())),
|
||||
mtime: f1
|
||||
.metadata()?
|
||||
.modified()?
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64,
|
||||
sync_required: true,
|
||||
}
|
||||
);
|
||||
|
||||
// mark it as unmodified
|
||||
entry.sync_required = false;
|
||||
mgr.set_entry(&entry)?;
|
||||
assert_eq!(mgr.changes_pending()?, 0);
|
||||
|
||||
// delete it
|
||||
fs::remove_file(&f1)?;
|
||||
|
||||
change_mtime(&media_dir);
|
||||
mgr.register_changes().unwrap();
|
||||
|
||||
assert_eq!(mgr.count()?, 0);
|
||||
assert_eq!(mgr.changes_pending()?, 1);
|
||||
assert_eq!(
|
||||
mgr.get_entry("file.jpg")?.unwrap(),
|
||||
MediaEntry {
|
||||
fname: "file.jpg".into(),
|
||||
sha1: None,
|
||||
mtime: 0,
|
||||
sync_required: true,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,26 @@
|
||||
use crate::err::Result;
|
||||
use crate::media::database::open_or_create;
|
||||
use rusqlite::Connection;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
pub mod database;
|
||||
pub mod files;
|
||||
|
||||
pub struct MediaManager {
|
||||
db: Connection,
|
||||
media_folder: PathBuf,
|
||||
}
|
||||
|
||||
impl MediaManager {
|
||||
pub fn new<P, P2>(media_folder: P, media_db: P2) -> Result<Self>
|
||||
where
|
||||
P: Into<PathBuf>,
|
||||
P2: AsRef<Path>,
|
||||
{
|
||||
let db = open_or_create(media_db.as_ref())?;
|
||||
Ok(MediaManager {
|
||||
db,
|
||||
media_folder: media_folder.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user