handle concurrent modifications and ankiweb terminating early
This commit is contained in:
parent
e0511c560b
commit
9067bf98bd
@ -24,8 +24,6 @@ static SYNC_MAX_FILES: usize = 25;
|
||||
static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize;
|
||||
static SYNC_SINGLE_FILE_MAX_BYTES: usize = 100 * 1024 * 1024;
|
||||
|
||||
// fixme: concurrent modifications during upload step
|
||||
|
||||
/// The counts are not cumulative - the progress hook should accumulate them.
|
||||
#[derive(Debug)]
|
||||
pub enum Progress {
|
||||
@ -157,19 +155,40 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
let file_count = pending.iter().filter(|e| e.sha1.is_some()).count();
|
||||
|
||||
let zip_data = zip_files(&self.mgr.media_folder, &pending)?;
|
||||
self.send_zip_data(zip_data).await?;
|
||||
let reply = self.send_zip_data(zip_data).await?;
|
||||
|
||||
let (processed_files, processed_deletions): (Vec<_>, Vec<_>) = pending
|
||||
.iter()
|
||||
.take(reply.processed)
|
||||
.partition(|e| e.sha1.is_some());
|
||||
|
||||
self.progress(Progress::Uploaded {
|
||||
files: file_count,
|
||||
deletions: pending.len() - file_count,
|
||||
files: processed_files.len(),
|
||||
deletions: processed_deletions.len(),
|
||||
})?;
|
||||
|
||||
let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect();
|
||||
self.ctx
|
||||
.transact(|ctx| record_clean(ctx, fnames.as_slice()))?;
|
||||
let fnames: Vec<_> = processed_files
|
||||
.iter()
|
||||
.chain(processed_deletions.iter())
|
||||
.map(|e| &e.fname)
|
||||
.collect();
|
||||
let fname_cnt = fnames.len() as i32;
|
||||
self.ctx.transact(|ctx| {
|
||||
record_clean(ctx, fnames.as_slice())?;
|
||||
let mut meta = ctx.get_meta()?;
|
||||
if meta.last_sync_usn + fname_cnt == reply.current_usn {
|
||||
meta.last_sync_usn = reply.current_usn;
|
||||
ctx.set_meta(&meta)?;
|
||||
} else {
|
||||
debug!(
|
||||
"server usn {} is not {}, skipping usn update",
|
||||
reply.current_usn,
|
||||
meta.last_sync_usn + fname_cnt
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -230,12 +249,17 @@ where
|
||||
resp.bytes().await.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn send_zip_data(&self, data: Vec<u8>) -> Result<()> {
|
||||
async fn send_zip_data(&self, data: Vec<u8>) -> Result<UploadReply> {
|
||||
let url = format!("{}uploadChanges", self.endpoint);
|
||||
|
||||
ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?;
|
||||
let resp = ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?;
|
||||
let res: UploadResult = resp.json().await?;
|
||||
|
||||
Ok(())
|
||||
if let Some(reply) = res.data {
|
||||
Ok(reply)
|
||||
} else {
|
||||
Err(AnkiError::server_message(res.err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -568,6 +592,18 @@ struct UploadEntry<'a> {
|
||||
in_zip_name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct UploadResult {
|
||||
data: Option<UploadReply>,
|
||||
err: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct UploadReply {
|
||||
processed: usize,
|
||||
current_usn: i32,
|
||||
}
|
||||
|
||||
fn zip_files(media_folder: &Path, files: &[MediaEntry]) -> Result<Vec<u8>> {
|
||||
let buf = vec![];
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user