tidy up sync.rs

This commit is contained in:
Damien Elmes 2020-02-05 20:56:38 +10:00
parent 9067bf98bd
commit 32a3b5a020
2 changed files with 133 additions and 140 deletions

View File

@ -6,7 +6,7 @@ use crate::backend_proto::backend_input::Value;
use crate::backend_proto::{Empty, RenderedTemplateReplacement, SyncMediaIn};
use crate::cloze::expand_clozes_to_reveal_latex;
use crate::err::{AnkiError, NetworkErrorKind, Result, SyncErrorKind};
use crate::media::sync::{sync_media, Progress as MediaSyncProgress};
use crate::media::sync::{MediaSyncer, Progress as MediaSyncProgress};
use crate::media::MediaManager;
use crate::sched::{local_minutes_west_for_stamp, sched_timing_today};
use crate::template::{
@ -318,14 +318,16 @@ impl Backend {
}
fn sync_media(&self, input: SyncMediaIn) -> Result<()> {
let mut mgr = MediaManager::new(&input.media_folder, &input.media_db)?;
let mgr = MediaManager::new(&input.media_folder, &input.media_db)?;
let callback = |progress: MediaSyncProgress| {
self.fire_progress_callback(Progress::MediaSync(progress))
};
let mut rt = Runtime::new().unwrap();
rt.block_on(sync_media(&mut mgr, &input.hkey, callback, &input.endpoint))
let mut syncer = MediaSyncer::new(&mgr, callback, &input.endpoint);
rt.block_on(syncer.sync(&input.hkey))
}
}

View File

@ -33,7 +33,7 @@ pub enum Progress {
RemovedFiles(usize),
}
struct SyncContext<'a, P>
pub struct MediaSyncer<'a, P>
where
P: Fn(Progress) -> bool,
{
@ -45,18 +45,101 @@ where
endpoint: &'a str,
}
impl<P> SyncContext<'_, P>
#[derive(Debug, Deserialize)]
struct SyncBeginResult {
data: Option<SyncBeginResponse>,
err: String,
}
#[derive(Debug, Deserialize)]
struct SyncBeginResponse {
#[serde(rename = "sk")]
sync_key: String,
usn: i32,
}
#[derive(Debug, Clone, Copy)]
enum LocalState {
NotInDB,
InDBNotPending,
InDBAndPending,
}
#[derive(PartialEq, Debug)]
enum RequiredChange {
// none also covers the case where we'll later upload
None,
Download,
Delete,
RemovePending,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct RecordBatchRequest {
last_usn: i32,
}
#[derive(Debug, Deserialize)]
struct RecordBatchResult {
data: Option<Vec<ServerMediaRecord>>,
err: String,
}
#[derive(Debug, Deserialize)]
struct ServerMediaRecord {
fname: String,
usn: i32,
sha1: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ZipRequest<'a> {
files: &'a [&'a String],
}
#[derive(Serialize_tuple)]
struct UploadEntry<'a> {
fname: &'a str,
in_zip_name: Option<String>,
}
#[derive(Deserialize, Debug)]
struct UploadResult {
data: Option<UploadReply>,
err: String,
}
#[derive(Deserialize, Debug)]
struct UploadReply {
processed: usize,
current_usn: i32,
}
#[derive(Serialize)]
struct FinalizeRequest {
local: u32,
}
#[derive(Debug, Deserialize)]
struct FinalizeResponse {
data: Option<String>,
err: String,
}
impl<P> MediaSyncer<'_, P>
where
P: Fn(Progress) -> bool,
{
fn new<'a>(mgr: &'a MediaManager, progress_cb: P, endpoint: &'a str) -> SyncContext<'a, P> {
pub fn new<'a>(mgr: &'a MediaManager, progress_cb: P, endpoint: &'a str) -> MediaSyncer<'a, P> {
let client = Client::builder()
.connect_timeout(time::Duration::from_secs(30))
.build()
.unwrap();
let ctx = mgr.dbctx();
SyncContext {
MediaSyncer {
mgr,
ctx,
skey: None,
@ -70,6 +153,44 @@ where
self.skey.as_ref().unwrap()
}
#[allow(clippy::useless_let_if_seq)]
pub async fn sync(&mut self, hkey: &str) -> Result<()> {
// make sure media DB is up to date
register_changes(&mut self.ctx, self.mgr.media_folder.as_path())?;
let meta = self.ctx.get_meta()?;
let client_usn = meta.last_sync_usn;
debug!("beginning media sync");
let (sync_key, server_usn) = self.sync_begin(hkey).await?;
self.skey = Some(sync_key);
debug!("server usn was {}", server_usn);
let mut actions_performed = false;
// need to fetch changes from server?
if client_usn != server_usn {
debug!("differs from local usn {}, fetching changes", client_usn);
self.fetch_changes(meta).await?;
actions_performed = true;
}
// need to send changes to server?
let changes_pending = !self.ctx.get_pending_uploads(1)?.is_empty();
if changes_pending {
self.send_changes().await?;
actions_performed = true;
}
if actions_performed {
self.finalize_sync().await?;
}
debug!("media sync complete");
Ok(())
}
async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> {
let url = format!("{}begin", self.endpoint);
@ -263,83 +384,6 @@ where
}
}
#[allow(clippy::useless_let_if_seq)]
pub async fn sync_media<F>(
mgr: &mut MediaManager,
hkey: &str,
progress_cb: F,
endpoint: &str,
) -> Result<()>
where
F: Fn(Progress) -> bool,
{
let mut sctx = SyncContext::new(mgr, progress_cb, endpoint);
// make sure media DB is up to date
register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?;
let meta = sctx.ctx.get_meta()?;
let client_usn = meta.last_sync_usn;
debug!("beginning media sync");
let (sync_key, server_usn) = sctx.sync_begin(hkey).await?;
sctx.skey = Some(sync_key);
debug!("server usn was {}", server_usn);
let mut actions_performed = false;
// need to fetch changes from server?
if client_usn != server_usn {
debug!("differs from local usn {}, fetching changes", client_usn);
sctx.fetch_changes(meta).await?;
actions_performed = true;
}
// need to send changes to server?
let changes_pending = !sctx.ctx.get_pending_uploads(1)?.is_empty();
if changes_pending {
sctx.send_changes().await?;
actions_performed = true;
}
if actions_performed {
sctx.finalize_sync().await?;
}
debug!("media sync complete");
Ok(())
}
#[derive(Debug, Deserialize)]
struct SyncBeginResult {
data: Option<SyncBeginResponse>,
err: String,
}
#[derive(Debug, Deserialize)]
struct SyncBeginResponse {
#[serde(rename = "sk")]
sync_key: String,
usn: i32,
}
#[derive(Debug, Clone, Copy)]
enum LocalState {
NotInDB,
InDBNotPending,
InDBAndPending,
}
#[derive(PartialEq, Debug)]
enum RequiredChange {
// no also covers the case where we'll later upload
None,
Download,
Delete,
RemovePending,
}
fn determine_required_change(
local_sha1: &str,
remote_sha1: &str,
@ -419,25 +463,6 @@ fn determine_required_changes<'a>(
Ok((to_download, to_delete, to_remove_pending))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct RecordBatchRequest {
last_usn: i32,
}
#[derive(Debug, Deserialize)]
struct RecordBatchResult {
data: Option<Vec<ServerMediaRecord>>,
err: String,
}
#[derive(Debug, Deserialize)]
struct ServerMediaRecord {
fname: String,
usn: i32,
sha1: String,
}
async fn ankiweb_json_request<T>(
client: &Client,
url: &str,
@ -483,12 +508,6 @@ async fn ankiweb_request(
.map_err(Into::into)
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ZipRequest<'a> {
files: &'a [&'a String],
}
fn extract_into_media_folder(media_folder: &Path, zip: Bytes) -> Result<Vec<AddedFile>> {
let reader = io::Cursor::new(zip);
let mut zip = zip::ZipArchive::new(reader)?;
@ -586,24 +605,6 @@ fn record_clean(ctx: &mut MediaDatabaseContext, clean: &[&String]) -> Result<()>
Ok(())
}
#[derive(Serialize_tuple)]
struct UploadEntry<'a> {
fname: &'a str,
in_zip_name: Option<String>,
}
#[derive(Deserialize, Debug)]
struct UploadResult {
data: Option<UploadReply>,
err: String,
}
#[derive(Deserialize, Debug)]
struct UploadReply {
processed: usize,
current_usn: i32,
}
fn zip_files(media_folder: &Path, files: &[MediaEntry]) -> Result<Vec<u8>> {
let buf = vec![];
@ -681,21 +682,10 @@ fn media_check_required() -> AnkiError {
}
}
#[derive(Serialize)]
struct FinalizeRequest {
local: u32,
}
#[derive(Debug, Deserialize)]
struct FinalizeResponse {
data: Option<String>,
err: String,
}
#[cfg(test)]
mod test {
use crate::err::Result;
use crate::media::sync::{determine_required_change, sync_media, LocalState, RequiredChange};
use crate::media::sync::{determine_required_change, LocalState, MediaSyncer, RequiredChange};
use crate::media::MediaManager;
use tempfile::tempdir;
use tokio::runtime::Runtime;
@ -715,7 +705,8 @@ mod test {
let mut mgr = MediaManager::new(&media_dir, &media_db)?;
sync_media(&mut mgr, hkey, progress, "https://sync.ankiweb.net/msync/").await?;
let mut syncer = MediaSyncer::new(&mut mgr, progress, "https://sync.ankiweb.net/msync/");
syncer.sync(hkey).await?;
Ok(())
}