allow normal sync tests to run offline

This commit is contained in:
Damien Elmes 2021-01-09 22:56:30 +10:00
parent 1425379d41
commit 09dfa9ced6
10 changed files with 519 additions and 172 deletions

View File

@ -65,6 +65,7 @@ rust_library(
proc_macro_deps = [
"//rslib/cargo:serde_derive",
"//rslib/cargo:serde_repr",
"//rslib/cargo:async_trait",
],
rustc_env = _anki_rustc_env,
visibility = ["//visibility:public"],

View File

@ -1694,11 +1694,11 @@ impl Backend {
};
let result = if upload {
let sync_fut = col_inner.full_upload(input.into(), progress_fn);
let sync_fut = col_inner.full_upload(input.into(), Box::new(progress_fn));
let abortable_sync = Abortable::new(sync_fut, abort_reg);
rt.block_on(abortable_sync)
} else {
let sync_fut = col_inner.full_download(input.into(), progress_fn);
let sync_fut = col_inner.full_download(input.into(), Box::new(progress_fn));
let abortable_sync = Abortable::new(sync_fut, abort_reg);
rt.block_on(abortable_sync)
};

View File

@ -38,11 +38,18 @@ pub fn open_collection<P: Into<PathBuf>>(
Ok(col)
}
// We need to make a Builder for Collection in the future.
#[cfg(test)]
pub fn open_test_collection() -> Collection {
open_test_collection_with_server(false)
}
#[cfg(test)]
pub fn open_test_collection_with_server(server: bool) -> Collection {
use crate::log;
let i18n = I18n::new(&[""], "", log::terminal());
open_collection(":memory:", "", "", false, i18n, log::terminal()).unwrap()
open_collection(":memory:", "", "", server, i18n, log::terminal()).unwrap()
}
#[derive(Debug, Default)]

View File

@ -229,7 +229,7 @@ impl super::SqliteStorage {
.prepare_cached("select null from cards")?
.query(NO_PARAMS)?
.next()
.map(|o| o.is_none())
.map(|o| o.is_some())
.map_err(Into::into)
}

View File

@ -16,6 +16,7 @@ mod tag;
mod upgrades;
pub(crate) use sqlite::SqliteStorage;
pub(crate) use sync::open_and_check_sqlite_file;
use std::fmt::Write;

View File

@ -170,7 +170,7 @@ impl SqliteStorage {
"update col set crt=?, scm=?, ver=?, conf=?",
params![
crt,
crt * 1000,
TimestampMillis::now(),
SCHEMA_STARTING_VERSION,
&schema11_config_as_string()
],

View File

@ -1,9 +1,11 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use std::path::Path;
use super::*;
use crate::prelude::*;
use rusqlite::{params, types::FromSql, ToSql, NO_PARAMS};
use rusqlite::{params, types::FromSql, Connection, ToSql, NO_PARAMS};
impl SqliteStorage {
pub(crate) fn usn(&self, server: bool) -> Result<Usn> {
@ -59,3 +61,28 @@ impl SqliteStorage {
Ok(())
}
}
/// Return error if file is unreadable, fails the sqlite
/// integrity check, or is not in the 'delete' journal mode.
/// On success, returns the opened DB.
pub(crate) fn open_and_check_sqlite_file(path: &Path) -> Result<Connection> {
let db = Connection::open(path)?;
match db.pragma_query_value(None, "integrity_check", |row| row.get::<_, String>(0)) {
Ok(s) => {
if s != "ok" {
return Err(AnkiError::invalid_input(format!("corrupt: {}", s)));
}
}
Err(e) => return Err(e.into()),
};
match db.pragma_query_value(None, "journal_mode", |row| row.get::<_, String>(0)) {
Ok(s) => {
if s == "delete" {
Ok(db)
} else {
Err(AnkiError::invalid_input(format!("corrupt: {}", s)))
}
}
Err(e) => Err(e.into()),
}
}

View File

@ -1,7 +1,9 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use super::server::SyncServer;
use super::*;
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use reqwest::Body;
@ -10,11 +12,14 @@ use reqwest::Body;
static SYNC_VERSION: u8 = 10;
pub type FullSyncProgressFn = Box<dyn FnMut(FullSyncProgress, bool) + Send + Sync + 'static>;
pub struct HTTPSyncClient {
hkey: Option<String>,
skey: String,
client: Client,
endpoint: String,
full_sync_progress_fn: Option<FullSyncProgressFn>,
}
#[derive(Serialize)]
@ -113,9 +118,14 @@ impl HTTPSyncClient {
skey,
client,
endpoint,
full_sync_progress_fn: None,
}
}
pub fn set_full_sync_progress_fn(&mut self, func: Option<FullSyncProgressFn>) {
self.full_sync_progress_fn = func;
}
async fn json_request<T>(&self, method: &str, json: &T, timeout_long: bool) -> Result<Response>
where
T: serde::Serialize,
@ -178,8 +188,11 @@ impl HTTPSyncClient {
pub(crate) fn hkey(&self) -> &str {
self.hkey.as_ref().unwrap()
}
}
pub(crate) async fn meta(&self) -> Result<SyncMeta> {
#[async_trait(?Send)]
impl SyncServer for HTTPSyncClient {
async fn meta(&self) -> Result<SyncMeta> {
let meta_in = MetaIn {
sync_version: SYNC_VERSION,
client_version: sync_client_version(),
@ -187,8 +200,8 @@ impl HTTPSyncClient {
self.json_request_deserialized("meta", &meta_in).await
}
pub(crate) async fn start(
&self,
async fn start(
&mut self,
local_usn: Usn,
minutes_west: Option<i32>,
local_is_newer: bool,
@ -202,47 +215,92 @@ impl HTTPSyncClient {
self.json_request_deserialized("start", &input).await
}
pub(crate) async fn apply_graves(&self, chunk: Graves) -> Result<()> {
async fn apply_graves(&mut self, chunk: Graves) -> Result<()> {
let input = ApplyGravesIn { chunk };
let resp = self.json_request("applyGraves", &input, false).await?;
resp.error_for_status()?;
Ok(())
}
pub(crate) async fn apply_changes(
&self,
changes: UnchunkedChanges,
) -> Result<UnchunkedChanges> {
async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result<UnchunkedChanges> {
let input = ApplyChangesIn { changes };
self.json_request_deserialized("applyChanges", &input).await
}
pub(crate) async fn chunk(&self) -> Result<Chunk> {
async fn chunk(&mut self) -> Result<Chunk> {
self.json_request_deserialized("chunk", &Empty {}).await
}
pub(crate) async fn apply_chunk(&self, chunk: Chunk) -> Result<()> {
async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> {
let input = ApplyChunkIn { chunk };
let resp = self.json_request("applyChunk", &input, false).await?;
resp.error_for_status()?;
Ok(())
}
pub(crate) async fn sanity_check(&self, client: SanityCheckCounts) -> Result<SanityCheckOut> {
async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> {
let input = SanityCheckIn { client, full: true };
self.json_request_deserialized("sanityCheck2", &input).await
}
pub(crate) async fn finish(&self) -> Result<TimestampMillis> {
async fn finish(&mut self) -> Result<TimestampMillis> {
Ok(self.json_request_deserialized("finish", &Empty {}).await?)
}
pub(crate) async fn abort(&self) -> Result<()> {
async fn abort(&mut self) -> Result<()> {
let resp = self.json_request("abort", &Empty {}, false).await?;
resp.error_for_status()?;
Ok(())
}
async fn full_upload(mut self: Box<Self>, col_path: &Path, _can_consume: bool) -> Result<()> {
let file = tokio::fs::File::open(col_path).await?;
let total_bytes = file.metadata().await?.len() as usize;
let progress_fn = self
.full_sync_progress_fn
.take()
.expect("progress func was not set");
let wrap1 = ProgressWrapper {
reader: file,
progress_fn,
progress: FullSyncProgress {
transferred_bytes: 0,
total_bytes,
},
};
let wrap2 = async_compression::stream::GzipEncoder::new(wrap1);
let body = Body::wrap_stream(wrap2);
self.upload_inner(body).await?;
Ok(())
}
/// Download collection into a temporary file, returning it.
/// Caller should persist the file in the correct path after checking it.
/// Progress func must be set first.
async fn full_download(mut self: Box<Self>, folder: &Path) -> Result<NamedTempFile> {
let mut temp_file = NamedTempFile::new_in(folder)?;
let (size, mut stream) = self.download_inner().await?;
let mut progress = FullSyncProgress {
transferred_bytes: 0,
total_bytes: size,
};
let mut progress_fn = self
.full_sync_progress_fn
.take()
.expect("progress func was not set");
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
temp_file.write_all(&chunk)?;
progress.transferred_bytes += chunk.len();
progress_fn(progress, true);
}
progress_fn(progress, false);
Ok(temp_file)
}
}
impl HTTPSyncClient {
async fn download_inner(
&self,
) -> Result<(
@ -254,33 +312,6 @@ impl HTTPSyncClient {
Ok((len as usize, resp.bytes_stream()))
}
/// Download collection into a temporary file, returning it.
/// Caller should persist the file in the correct path after checking it.
pub(crate) async fn download<P>(
&self,
folder: &Path,
mut progress_fn: P,
) -> Result<NamedTempFile>
where
P: FnMut(FullSyncProgress, bool),
{
let mut temp_file = NamedTempFile::new_in(folder)?;
let (size, mut stream) = self.download_inner().await?;
let mut progress = FullSyncProgress {
transferred_bytes: 0,
total_bytes: size,
};
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
temp_file.write_all(&chunk)?;
progress.transferred_bytes += chunk.len();
progress_fn(progress, true);
}
progress_fn(progress, false);
Ok(temp_file)
}
async fn upload_inner(&self, body: Body) -> Result<()> {
let data_part = multipart::Part::stream(body);
let resp = self.request("upload", data_part, true).await?;
@ -295,27 +326,6 @@ impl HTTPSyncClient {
Ok(())
}
}
pub(crate) async fn upload<P>(&mut self, col_path: &Path, progress_fn: P) -> Result<()>
where
P: FnMut(FullSyncProgress, bool) + Send + Sync + 'static,
{
let file = tokio::fs::File::open(col_path).await?;
let total_bytes = file.metadata().await?.len() as usize;
let wrap1 = ProgressWrapper {
reader: file,
progress_fn,
progress: FullSyncProgress {
transferred_bytes: 0,
total_bytes,
},
};
let wrap2 = async_compression::stream::GzipEncoder::new(wrap1);
let body = Body::wrap_stream(wrap2);
self.upload_inner(body).await?;
Ok(())
}
}
use futures::{
@ -380,7 +390,7 @@ mod test {
use tokio::runtime::Runtime;
async fn http_client_inner(username: String, password: String) -> Result<()> {
let mut syncer = HTTPSyncClient::new(None, 0);
let mut syncer = Box::new(HTTPSyncClient::new(None, 0));
assert!(matches!(
syncer.login("nosuchuser", "nosuchpass").await,
@ -445,17 +455,16 @@ mod test {
use tempfile::tempdir;
let dir = tempdir()?;
let out_path = syncer
.download(&dir.path(), |progress, _throttle| {
syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| {
println!("progress: {:?}", progress);
})
.await?;
})));
let out_path = syncer.full_download(&dir.path()).await?;
syncer
.upload(&out_path.path(), |progress, _throttle| {
let mut syncer = Box::new(HTTPSyncClient::new(None, 0));
syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| {
println!("progress {:?}", progress);
})
.await?;
})));
syncer.full_upload(&out_path.path(), false).await?;
Ok(())
}

View File

@ -2,6 +2,7 @@
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
mod http_client;
mod server;
use crate::{
backend_proto::{sync_status_out, SyncStatusOut},
@ -14,12 +15,14 @@ use crate::{
prelude::*,
revlog::RevlogEntry,
serde::{default_on_invalid, deserialize_int_from_number},
storage::open_and_check_sqlite_file,
tags::{join_tags, split_tags},
version::sync_client_version,
};
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::StreamExt;
pub use http_client::FullSyncProgressFn;
use http_client::HTTPSyncClient;
pub use http_client::Timeouts;
use itertools::Itertools;
@ -27,6 +30,7 @@ use reqwest::{multipart, Client, Response};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use serde_tuple::Serialize_tuple;
use server::SyncServer;
use std::io::prelude::*;
use std::{collections::HashMap, path::Path, time::Duration};
use tempfile::NamedTempFile;
@ -174,7 +178,7 @@ enum SanityCheckStatus {
Bad,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
#[derive(Serialize_tuple, Deserialize, Debug, PartialEq)]
pub struct SanityCheckCounts {
pub counts: SanityCheckDueCounts,
pub cards: u32,
@ -187,7 +191,7 @@ pub struct SanityCheckCounts {
pub deck_config: u32,
}
#[derive(Serialize_tuple, Deserialize, Debug, Default)]
#[derive(Serialize_tuple, Deserialize, Debug, Default, PartialEq)]
pub struct SanityCheckDueCounts {
pub new: u32,
pub learn: u32,
@ -221,6 +225,7 @@ struct SyncState {
host_number: u32,
}
#[derive(Debug)]
pub struct SyncOutput {
pub required: SyncActionRequired,
pub server_message: String,
@ -235,7 +240,7 @@ pub struct SyncAuth {
struct NormalSyncer<'a, F> {
col: &'a mut Collection,
remote: HTTPSyncClient,
remote: Box<dyn SyncServer>,
progress: NormalSyncProgress,
progress_fn: F,
}
@ -292,14 +297,17 @@ impl<F> NormalSyncer<'_, F>
where
F: FnMut(NormalSyncProgress, bool),
{
/// Create a new syncing instance. If host_number is unavailable, use 0.
pub fn new(col: &mut Collection, auth: SyncAuth, progress_fn: F) -> NormalSyncer<'_, F>
pub fn new(
col: &mut Collection,
server: Box<dyn SyncServer>,
progress_fn: F,
) -> NormalSyncer<'_, F>
where
F: FnMut(NormalSyncProgress, bool),
{
NormalSyncer {
col,
remote: HTTPSyncClient::new(Some(auth.hkey), auth.host_number),
remote: server,
progress: NormalSyncProgress::default(),
progress_fn,
}
@ -347,6 +355,7 @@ where
async fn get_sync_state(&self) -> Result<SyncState> {
let remote: SyncMeta = self.remote.meta().await?;
debug!(self.col.log, "remote {:?}", &remote);
if !remote.should_continue {
debug!(self.col.log, "server says abort"; "message"=>&remote.server_message);
return Err(AnkiError::SyncError {
@ -356,6 +365,7 @@ where
}
let local = self.col.sync_meta()?;
debug!(self.col.log, "local {:?}", &local);
let delta = remote.current_time.0 - local.current_time.0;
if delta.abs() > 300 {
debug!(self.col.log, "clock off"; "delta"=>delta);
@ -553,7 +563,7 @@ where
}
}
async fn finalize(&self, state: &SyncState) -> Result<()> {
async fn finalize(&mut self, state: &SyncState) -> Result<()> {
let new_server_mtime = self.remote.finish().await?;
self.col.finalize_sync(state, new_server_mtime)
}
@ -595,7 +605,7 @@ pub async fn sync_login(username: &str, password: &str) -> Result<SyncAuth> {
}
pub async fn sync_abort(hkey: String, host_number: u32) -> Result<()> {
let remote = HTTPSyncClient::new(Some(hkey), host_number);
let mut remote = HTTPSyncClient::new(Some(hkey), host_number);
remote.abort().await
}
@ -624,45 +634,53 @@ impl Collection {
Ok(self.sync_meta()?.compared_to_remote(remote).required.into())
}
/// Create a new syncing instance. If host_number is unavailable, use 0.
pub async fn normal_sync<F>(&mut self, auth: SyncAuth, progress_fn: F) -> Result<SyncOutput>
where
F: FnMut(NormalSyncProgress, bool),
{
NormalSyncer::new(self, auth, progress_fn).sync().await
NormalSyncer::new(
self,
Box::new(HTTPSyncClient::new(Some(auth.hkey), auth.host_number)),
progress_fn,
)
.sync()
.await
}
/// Upload collection to AnkiWeb. Caller must re-open afterwards.
pub async fn full_upload<F>(mut self, auth: SyncAuth, progress_fn: F) -> Result<()>
where
F: FnMut(FullSyncProgress, bool) + Send + Sync + 'static,
{
pub async fn full_upload(self, auth: SyncAuth, progress_fn: FullSyncProgressFn) -> Result<()> {
let mut server = HTTPSyncClient::new(Some(auth.hkey), auth.host_number);
server.set_full_sync_progress_fn(Some(progress_fn));
self.full_upload_inner(Box::new(server)).await
// remote.upload(&col_path, progress_fn).await?;
}
pub(crate) async fn full_upload_inner(mut self, server: Box<dyn SyncServer>) -> Result<()> {
self.before_upload()?;
let col_path = self.col_path.clone();
self.close(true)?;
let mut remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number);
remote.upload(&col_path, progress_fn).await?;
Ok(())
server.full_upload(&col_path, false).await
}
/// Download collection from AnkiWeb. Caller must re-open afterwards.
pub async fn full_download<F>(self, auth: SyncAuth, progress_fn: F) -> Result<()>
where
F: FnMut(FullSyncProgress, bool),
{
pub async fn full_download(
self,
auth: SyncAuth,
progress_fn: FullSyncProgressFn,
) -> Result<()> {
let mut server = HTTPSyncClient::new(Some(auth.hkey), auth.host_number);
server.set_full_sync_progress_fn(Some(progress_fn));
self.full_download_inner(Box::new(server)).await
}
pub(crate) async fn full_download_inner(self, server: Box<dyn SyncServer>) -> Result<()> {
let col_path = self.col_path.clone();
let folder = col_path.parent().unwrap();
self.close(false)?;
let remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number);
let out_file = remote.download(folder, progress_fn).await?;
let out_file = server.full_download(folder).await?;
// check file ok
let db = rusqlite::Connection::open(out_file.path())?;
let check_result: String = db.pragma_query_value(None, "integrity_check", |r| r.get(0))?;
if check_result != "ok" {
return Err(AnkiError::SyncError {
info: "download corrupt".into(),
kind: SyncErrorKind::Other,
});
}
let db = open_and_check_sqlite_file(out_file.path())?;
db.execute_batch("update col set ls=mod")?;
drop(db);
// overwrite existing collection atomically
@ -683,11 +701,11 @@ impl Collection {
server_message: "".into(),
should_continue: true,
host_number: 0,
empty: self.storage.have_at_least_one_card()?,
empty: !self.storage.have_at_least_one_card()?,
})
}
fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> {
pub fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> {
for nid in graves.notes {
self.storage.remove_note(nid)?;
self.storage.add_note_grave(nid, latest_usn)?;
@ -896,6 +914,9 @@ impl Collection {
// Remote->local chunks
//----------------------------------------------------------------
/// pending_usn is used to decide whether the local objects are newer.
/// If the provided objects are not modified locally, the USN inside
/// the individual objects is used.
fn apply_chunk(&mut self, chunk: Chunk, pending_usn: Usn) -> Result<()> {
self.merge_revlog(chunk.revlog)?;
self.merge_cards(chunk.cards, pending_usn)?;
@ -1173,6 +1194,10 @@ impl From<SyncActionRequired> for sync_status_out::Required {
#[cfg(test)]
mod test {
use async_trait::async_trait;
use lazy_static::lazy_static;
use super::server::LocalServer;
use super::*;
use crate::log;
use crate::{
@ -1186,40 +1211,145 @@ mod test {
fn full_progress(_: FullSyncProgress, _: bool) {}
struct TestContext {
dir: TempDir,
auth: SyncAuth,
col1: Option<Collection>,
col2: Option<Collection>,
#[test]
/// Run remote tests if hkey provided in environment; otherwise local.
fn syncing() -> Result<()> {
let ctx: Box<dyn TestContext> = if let Ok(hkey) = std::env::var("TEST_HKEY") {
Box::new(RemoteTestContext {
auth: SyncAuth {
hkey,
host_number: 0,
},
})
} else {
Box::new(LocalTestContext {})
};
let mut rt = Runtime::new().unwrap();
rt.block_on(upload_download(&ctx))?;
rt.block_on(regular_sync(&ctx))
}
fn open_col(ctx: &TestContext, fname: &str) -> Result<Collection> {
let path = ctx.dir.path().join(fname);
fn open_col(dir: &Path, server: bool, fname: &str) -> Result<Collection> {
let path = dir.join(fname);
let i18n = I18n::new(&[""], "", log::terminal());
open_collection(path, "".into(), "".into(), false, i18n, log::terminal())
open_collection(path, "".into(), "".into(), server, i18n, log::terminal())
}
async fn upload_download(ctx: &mut TestContext) -> Result<()> {
// add a card
let mut col1 = open_col(ctx, "col1.anki2")?;
let nt = col1.get_notetype_by_name("Basic")?.unwrap();
#[async_trait(?Send)]
trait TestContext {
fn server(&self) -> Box<dyn SyncServer>;
fn col1(&self) -> Collection {
open_col(self.dir(), false, "col1.anki2").unwrap()
}
fn col2(&self) -> Collection {
open_col(self.dir(), false, "col2.anki2").unwrap()
}
fn dir(&self) -> &Path {
lazy_static! {
static ref DIR: TempDir = tempdir().unwrap();
}
DIR.path()
}
async fn normal_sync(&self, col: &mut Collection) -> SyncOutput {
NormalSyncer::new(col, self.server(), norm_progress)
.sync()
.await
.unwrap()
}
async fn full_upload(&self, col: Collection) {
col.full_upload_inner(self.server()).await.unwrap()
}
async fn full_download(&self, col: Collection) {
col.full_download_inner(self.server()).await.unwrap()
}
}
// Local specifics
/////////////////////
struct LocalTestContext {}
#[async_trait(?Send)]
impl TestContext for LocalTestContext {
fn server(&self) -> Box<dyn SyncServer> {
let col = open_col(self.dir(), true, "server.anki2").unwrap();
Box::new(LocalServer::new(col))
}
}
// Remote specifics
/////////////////////
struct RemoteTestContext {
auth: SyncAuth,
}
impl RemoteTestContext {
fn server_inner(&self) -> HTTPSyncClient {
let auth = self.auth.clone();
HTTPSyncClient::new(Some(auth.hkey), auth.host_number)
}
}
#[async_trait(?Send)]
impl TestContext for RemoteTestContext {
fn server(&self) -> Box<dyn SyncServer> {
Box::new(self.server_inner())
}
async fn full_upload(&self, col: Collection) {
let mut server = self.server_inner();
server.set_full_sync_progress_fn(Some(Box::new(full_progress)));
col.full_upload_inner(Box::new(server)).await.unwrap()
}
async fn full_download(&self, col: Collection) {
let mut server = self.server_inner();
server.set_full_sync_progress_fn(Some(Box::new(full_progress)));
col.full_download_inner(Box::new(server)).await.unwrap()
}
}
// Setup + full syncs
/////////////////////
fn col1_setup(col: &mut Collection) {
let nt = col.get_notetype_by_name("Basic").unwrap().unwrap();
let mut note = nt.new_note();
note.fields[0] = "1".into();
col1.add_note(&mut note, DeckID(1))?;
col.add_note(&mut note, DeckID(1)).unwrap();
let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?;
// // set our schema time back, so when initial server
// // col is created, it's not identical
// col.storage
// .db
// .execute_batch("update col set scm = 123")
// .unwrap()
}
async fn upload_download(ctx: &Box<dyn TestContext>) -> Result<()> {
let mut col1 = ctx.col1();
col1_setup(&mut col1);
let out = ctx.normal_sync(&mut col1).await;
assert!(matches!(
out.required,
SyncActionRequired::FullSyncRequired { .. }
));
col1.full_upload(ctx.auth.clone(), full_progress).await?;
ctx.full_upload(col1).await;
// another collection
let mut col2 = open_col(ctx, "col2.anki2")?;
let mut col2 = ctx.col2();
// won't allow ankiweb clobber
let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(
out.required,
SyncActionRequired::FullSyncRequired {
@ -1229,20 +1359,19 @@ mod test {
);
// fetch so we're in sync
col2.full_download(ctx.auth.clone(), full_progress).await?;
// reopen the two collections
ctx.col1 = Some(open_col(ctx, "col1.anki2")?);
ctx.col2 = Some(open_col(ctx, "col2.anki2")?);
ctx.full_download(col2).await;
Ok(())
}
async fn regular_sync(ctx: &mut TestContext) -> Result<()> {
let col1 = ctx.col1.as_mut().unwrap();
let col2 = ctx.col2.as_mut().unwrap();
// Regular syncs
/////////////////////
async fn regular_sync(ctx: &Box<dyn TestContext>) -> Result<()> {
// add a deck
let mut col1 = ctx.col1();
let mut col2 = ctx.col2();
let mut deck = col1.get_or_create_normal_deck("new deck")?;
// give it a new option group
@ -1280,15 +1409,15 @@ mod test {
// col1.storage.set_creation_stamp(TimestampSecs(12345))?;
// and sync our changes
let remote = get_remote_sync_meta(ctx.auth.clone()).await?;
let out = col1.get_sync_status(remote)?;
let remote_meta = ctx.server().meta().await.unwrap();
let out = col1.get_sync_status(remote_meta)?;
assert_eq!(out, sync_status_out::Required::NormalSync);
let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// sync the other collection
let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let ntid = nt.id;
@ -1329,7 +1458,7 @@ mod test {
);
assert_eq!(
col1.storage.creation_stamp()?,
col1.storage.creation_stamp()?
col2.storage.creation_stamp()?
);
// server doesn't send tag usns, so we can only compare tags, not usns,
@ -1351,7 +1480,7 @@ mod test {
};
// make sure everything has been transferred across
compare_sides(col1, col2)?;
compare_sides(&mut col1, &mut col2)?;
// make some modifications
let mut note = col2.storage.get_note(note.id)?.unwrap();
@ -1373,13 +1502,13 @@ mod test {
col2.update_notetype(&mut nt, false)?;
// sync the changes back
let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// should still match
compare_sides(col1, col2)?;
compare_sides(&mut col1, &mut col2)?;
// deletions should sync too
for table in &["cards", "notes", "decks"] {
@ -1392,12 +1521,13 @@ mod test {
// fixme: inconsistent usn arg
col1.remove_cards_and_orphaned_notes(&[cardid])?;
col1.remove_note_only(noteid, col1.usn()?)?;
let usn = col1.usn()?;
col1.remove_note_only(noteid, usn)?;
col1.remove_deck_and_child_decks(deckid)?;
let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
for table in &["cards", "notes", "decks"] {
@ -1410,32 +1540,8 @@ mod test {
// removing things like a notetype forces a full sync
col2.remove_notetype(ntid)?;
let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?;
let out = ctx.normal_sync(&mut col2).await;
assert!(matches!(out.required, SyncActionRequired::FullSyncRequired { .. }));
Ok(())
}
#[test]
fn collection_sync() -> Result<()> {
let hkey = match std::env::var("TEST_HKEY") {
Ok(s) => s,
Err(_) => {
return Ok(());
}
};
let mut ctx = TestContext {
dir: tempdir()?,
auth: SyncAuth {
hkey,
host_number: 0,
},
col1: None,
col2: None,
};
let mut rt = Runtime::new().unwrap();
rt.block_on(upload_download(&mut ctx))?;
rt.block_on(regular_sync(&mut ctx))
}
}

196
rslib/src/sync/server.rs Normal file
View File

@ -0,0 +1,196 @@
use std::{fs, path::Path};
use crate::{
prelude::*,
storage::open_and_check_sqlite_file,
sync::{
Chunk, Graves, SanityCheckCounts, SanityCheckOut, SanityCheckStatus, SyncMeta,
UnchunkedChanges, Usn,
},
};
use async_trait::async_trait;
use tempfile::NamedTempFile;
use super::ChunkableIDs;
#[async_trait(?Send)]
pub trait SyncServer {
async fn meta(&self) -> Result<SyncMeta>;
async fn start(
&mut self,
client_usn: Usn,
minutes_west: Option<i32>,
local_is_newer: bool,
) -> Result<Graves>;
async fn apply_graves(&mut self, client_chunk: Graves) -> Result<()>;
async fn apply_changes(&mut self, client_changes: UnchunkedChanges)
-> Result<UnchunkedChanges>;
async fn chunk(&mut self) -> Result<Chunk>;
async fn apply_chunk(&mut self, client_chunk: Chunk) -> Result<()>;
async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut>;
async fn finish(&mut self) -> Result<TimestampMillis>;
async fn abort(&mut self) -> Result<()>;
/// If `can_consume` is true, the local server will move or remove the file, instead
/// creating a copy. The remote server ignores this argument.
async fn full_upload(self: Box<Self>, col_path: &Path, can_consume: bool) -> Result<()>;
async fn full_download(self: Box<Self>, folder: &Path) -> Result<NamedTempFile>;
}
pub struct LocalServer {
col: Collection,
// The current sync protocol is stateful, so unfortunately we need to
// retain a bunch of information across requests. These are set either
// on start, or on subsequent methods.
server_usn: Usn,
client_usn: Usn,
/// Only used to determine whether we should send our
/// config to client.
client_is_newer: bool,
/// Set on the first call to chunk()
server_chunk_ids: Option<ChunkableIDs>,
}
impl LocalServer {
#[allow(dead_code)]
pub fn new(col: Collection) -> LocalServer {
assert!(col.server);
LocalServer {
col,
server_usn: Usn(0),
client_usn: Usn(0),
client_is_newer: false,
server_chunk_ids: None,
}
}
}
#[async_trait(?Send)]
impl SyncServer for LocalServer {
async fn meta(&self) -> Result<SyncMeta> {
Ok(SyncMeta {
modified: self.col.storage.get_modified_time()?,
schema: self.col.storage.get_schema_mtime()?,
usn: self.col.storage.usn(true)?,
current_time: TimestampSecs::now(),
server_message: String::new(),
should_continue: true,
host_number: 0,
empty: !self.col.storage.have_at_least_one_card()?,
})
}
async fn start(
&mut self,
client_usn: Usn,
minutes_west: Option<i32>,
client_is_newer: bool,
) -> Result<Graves> {
self.server_usn = self.col.usn()?;
self.client_usn = client_usn;
self.client_is_newer = client_is_newer;
self.col.storage.begin_rust_trx()?;
if let Some(mins) = minutes_west {
self.col.set_local_mins_west(mins)?;
}
self.col.storage.pending_graves(client_usn)
}
async fn apply_graves(&mut self, client_chunk: Graves) -> Result<()> {
self.col.apply_graves(client_chunk, self.server_usn)
}
async fn apply_changes(
&mut self,
client_changes: UnchunkedChanges,
) -> Result<UnchunkedChanges> {
let server_changes =
self.col
.local_unchunked_changes(self.client_usn, None, !self.client_is_newer)?;
self.col.apply_changes(client_changes, self.server_usn)?;
Ok(server_changes)
}
async fn chunk(&mut self) -> Result<Chunk> {
if self.server_chunk_ids.is_none() {
self.server_chunk_ids = Some(self.col.get_chunkable_ids(self.client_usn)?);
}
self.col
.get_chunk(self.server_chunk_ids.as_mut().unwrap(), None)
}
async fn apply_chunk(&mut self, client_chunk: Chunk) -> Result<()> {
self.col.apply_chunk(client_chunk, self.client_usn)
}
async fn sanity_check(&mut self, mut client: SanityCheckCounts) -> Result<SanityCheckOut> {
client.counts = Default::default();
let server = self.col.storage.sanity_check_info()?;
Ok(SanityCheckOut {
status: if client == server {
SanityCheckStatus::Ok
} else {
SanityCheckStatus::Bad
},
client: Some(client),
server: Some(server),
})
}
async fn finish(&mut self) -> Result<TimestampMillis> {
let now = TimestampMillis::now();
self.col.storage.set_modified_time(now)?;
self.col.storage.set_last_sync(now)?;
self.col.storage.increment_usn()?;
self.col.storage.commit_rust_trx()?;
Ok(now)
}
async fn abort(&mut self) -> Result<()> {
self.col.storage.rollback_rust_trx()
}
/// `col_path` should point to the uploaded file, and the caller is
/// responsible for imposing limits on its size if it wishes.
/// If `can_consume` is true, the provided file will be moved into place,
/// or removed on failure. If false, the original will be left alone.
async fn full_upload(
mut self: Box<Self>,
mut col_path: &Path,
can_consume: bool,
) -> Result<()> {
// create a copy if necessary
let new_file: NamedTempFile;
if !can_consume {
new_file = NamedTempFile::new()?;
fs::copy(col_path, &new_file.path())?;
col_path = new_file.path();
}
open_and_check_sqlite_file(col_path).map_err(|check_err| {
match fs::remove_file(col_path) {
Ok(_) => check_err,
Err(remove_err) => remove_err.into(),
}
})?;
let target_col_path = self.col.col_path.clone();
self.col.close(false)?;
fs::rename(col_path, &target_col_path).map_err(Into::into)
}
async fn full_download(mut self: Box<Self>, output_folder: &Path) -> Result<NamedTempFile> {
// bump usn/mod & close
self.col.transact(None, |col| col.storage.increment_usn())?;
let col_path = self.col.col_path.clone();
self.col.close(true)?;
// copy file and return path
let temp_file = NamedTempFile::new_in(output_folder)?;
fs::copy(&col_path, temp_file.path())?;
Ok(temp_file)
}
}