pass in endpoint
This commit is contained in:
parent
0c124188cd
commit
ec9abf1ce5
@ -42,7 +42,6 @@ class MediaSyncState:
|
||||
|
||||
|
||||
# fixme: abort when closing collection/app
|
||||
# fixme: shards
|
||||
# fixme: concurrent modifications during upload step
|
||||
# fixme: mediaSanity
|
||||
# fixme: corruptMediaDB
|
||||
@ -110,25 +109,25 @@ class MediaSyncer:
|
||||
self._log_and_notify(_("Media syncing disabled."))
|
||||
return
|
||||
|
||||
shard = None
|
||||
|
||||
self._log_and_notify(_("Media sync starting..."))
|
||||
self._sync_state = MediaSyncState()
|
||||
self._want_stop = False
|
||||
self._on_start_stop()
|
||||
|
||||
(media_folder, media_db) = media_paths_from_col_path(self.mw.col.path)
|
||||
|
||||
def run() -> None:
|
||||
self.mw.col.backend.sync_media(hkey, media_folder, media_db, self._endpoint())
|
||||
|
||||
self.mw.taskman.run_in_background(run, self._on_finished)
|
||||
|
||||
def _endpoint(self) -> str:
|
||||
shard = self.mw.pm.sync_shard()
|
||||
if shard is not None:
|
||||
shard_str = str(shard)
|
||||
else:
|
||||
shard_str = ""
|
||||
endpoint = f"https://sync{shard_str}ankiweb.net"
|
||||
|
||||
(media_folder, media_db) = media_paths_from_col_path(self.mw.col.path)
|
||||
|
||||
def run() -> None:
|
||||
self.mw.col.backend.sync_media(hkey, media_folder, media_db, endpoint)
|
||||
|
||||
self.mw.taskman.run_in_background(run, self._on_finished)
|
||||
return f"https://sync{shard_str}.ankiweb.net/msync/"
|
||||
|
||||
def _log_and_notify(self, entry: LogEntry) -> None:
|
||||
entry_with_time = LogEntryWithTime(time=intTime(), entry=entry)
|
||||
|
@ -521,6 +521,9 @@ please see:
|
||||
def media_syncing_enabled(self) -> bool:
|
||||
return self.profile["syncMedia"]
|
||||
|
||||
def sync_shard(self) -> Optional[int]:
|
||||
return self.profile.get("hostNum")
|
||||
|
||||
######################################################################
|
||||
|
||||
def apply_profile_options(self) -> None:
|
||||
|
@ -299,7 +299,7 @@ impl Backend {
|
||||
};
|
||||
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
rt.block_on(sync_media(&mut mgr, &input.hkey, callback))
|
||||
rt.block_on(sync_media(&mut mgr, &input.hkey, callback, &input.endpoint))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,13 +19,7 @@ use std::io::{Read, Write};
|
||||
use std::path::Path;
|
||||
use std::{io, time};
|
||||
|
||||
// fixme: sync url
|
||||
// fixme: version string
|
||||
// fixme: shards
|
||||
|
||||
// fixme: refactor into a struct
|
||||
|
||||
static SYNC_URL: &str = "https://sync.ankiweb.net/msync/";
|
||||
|
||||
static SYNC_MAX_FILES: usize = 25;
|
||||
static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize;
|
||||
@ -48,13 +42,14 @@ where
|
||||
skey: Option<String>,
|
||||
client: Client,
|
||||
progress_cb: P,
|
||||
endpoint: &'a str,
|
||||
}
|
||||
|
||||
impl<P> SyncContext<'_, P>
|
||||
where
|
||||
P: Fn(Progress) -> bool,
|
||||
{
|
||||
fn new(mgr: &MediaManager, progress_cb: P) -> SyncContext<P> {
|
||||
fn new<'a>(mgr: &'a MediaManager, progress_cb: P, endpoint: &'a str) -> SyncContext<'a, P> {
|
||||
let client = Client::builder()
|
||||
.connect_timeout(time::Duration::from_secs(30))
|
||||
.build()
|
||||
@ -67,6 +62,7 @@ where
|
||||
skey: None,
|
||||
client,
|
||||
progress_cb,
|
||||
endpoint,
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,7 +71,7 @@ where
|
||||
}
|
||||
|
||||
async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> {
|
||||
let url = format!("{}/begin", SYNC_URL);
|
||||
let url = format!("{}/begin", self.endpoint);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
@ -100,7 +96,7 @@ where
|
||||
loop {
|
||||
debug!("fetching record batch starting from usn {}", last_usn);
|
||||
|
||||
let batch = fetch_record_batch(&self.client, self.skey(), last_usn).await?;
|
||||
let batch = self.fetch_record_batch(last_usn).await?;
|
||||
if batch.is_empty() {
|
||||
debug!("empty batch, done");
|
||||
break;
|
||||
@ -125,7 +121,7 @@ where
|
||||
.take(SYNC_MAX_FILES)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect();
|
||||
let zip_data = fetch_zip(&self.client, self.skey(), batch.as_slice()).await?;
|
||||
let zip_data = self.fetch_zip(batch.as_slice()).await?;
|
||||
let download_batch =
|
||||
extract_into_media_folder(self.mgr.media_folder.as_path(), zip_data)?
|
||||
.into_iter();
|
||||
@ -161,7 +157,7 @@ where
|
||||
let file_count = pending.iter().filter(|e| e.sha1.is_some()).count();
|
||||
|
||||
let zip_data = zip_files(&self.mgr.media_folder, &pending)?;
|
||||
send_zip_data(&self.client, self.skey(), zip_data).await?;
|
||||
self.send_zip_data(zip_data).await?;
|
||||
|
||||
self.progress(Progress::Uploaded {
|
||||
files: file_count,
|
||||
@ -177,7 +173,7 @@ where
|
||||
}
|
||||
|
||||
async fn finalize_sync(&mut self) -> Result<()> {
|
||||
let url = format!("{}/mediaSanity", SYNC_URL);
|
||||
let url = format!("{}mediaSanity", self.endpoint);
|
||||
let local = self.ctx.count()?;
|
||||
|
||||
let obj = FinalizeRequest { local };
|
||||
@ -207,14 +203,51 @@ where
|
||||
Err(AnkiError::Interrupted)
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_record_batch(&self, last_usn: i32) -> Result<Vec<ServerMediaRecord>> {
|
||||
let url = format!("{}mediaChanges", self.endpoint);
|
||||
|
||||
let req = RecordBatchRequest { last_usn };
|
||||
let resp = ankiweb_json_request(&self.client, &url, &req, self.skey()).await?;
|
||||
let res: RecordBatchResult = resp.json().await?;
|
||||
|
||||
if let Some(batch) = res.data {
|
||||
Ok(batch)
|
||||
} else {
|
||||
Err(AnkiError::AnkiWebMiscError { info: res.err })
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_zip(&self, files: &[&String]) -> Result<Bytes> {
|
||||
let url = format!("{}downloadFiles", self.endpoint);
|
||||
|
||||
debug!("requesting files: {:?}", files);
|
||||
|
||||
let req = ZipRequest { files };
|
||||
let resp = ankiweb_json_request(&self.client, &url, &req, self.skey()).await?;
|
||||
resp.bytes().await.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn send_zip_data(&self, data: Vec<u8>) -> Result<()> {
|
||||
let url = format!("{}uploadChanges", self.endpoint);
|
||||
|
||||
ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::useless_let_if_seq)]
|
||||
pub async fn sync_media<F>(mgr: &mut MediaManager, hkey: &str, progress_cb: F) -> Result<()>
|
||||
pub async fn sync_media<F>(
|
||||
mgr: &mut MediaManager,
|
||||
hkey: &str,
|
||||
progress_cb: F,
|
||||
endpoint: &str,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(Progress) -> bool,
|
||||
{
|
||||
let mut sctx = SyncContext::new(mgr, progress_cb);
|
||||
let mut sctx = SyncContext::new(mgr, progress_cb, endpoint);
|
||||
|
||||
// make sure media DB is up to date
|
||||
register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?;
|
||||
@ -432,40 +465,12 @@ async fn ankiweb_request(
|
||||
.map_err(rewrite_forbidden)
|
||||
}
|
||||
|
||||
async fn fetch_record_batch(
|
||||
client: &Client,
|
||||
skey: &str,
|
||||
last_usn: i32,
|
||||
) -> Result<Vec<ServerMediaRecord>> {
|
||||
let url = format!("{}/mediaChanges", SYNC_URL);
|
||||
|
||||
let req = RecordBatchRequest { last_usn };
|
||||
let resp = ankiweb_json_request(client, &url, &req, skey).await?;
|
||||
let res: RecordBatchResult = resp.json().await?;
|
||||
|
||||
if let Some(batch) = res.data {
|
||||
Ok(batch)
|
||||
} else {
|
||||
Err(AnkiError::AnkiWebMiscError { info: res.err })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct ZipRequest<'a> {
|
||||
files: &'a [&'a String],
|
||||
}
|
||||
|
||||
async fn fetch_zip(client: &Client, skey: &str, files: &[&String]) -> Result<Bytes> {
|
||||
let url = format!("{}/downloadFiles", SYNC_URL);
|
||||
|
||||
debug!("requesting files: {:?}", files);
|
||||
|
||||
let req = ZipRequest { files };
|
||||
let resp = ankiweb_json_request(client, &url, &req, skey).await?;
|
||||
resp.bytes().await.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn extract_into_media_folder(media_folder: &Path, zip: Bytes) -> Result<Vec<AddedFile>> {
|
||||
let reader = io::Cursor::new(zip);
|
||||
let mut zip = zip::ZipArchive::new(reader)?;
|
||||
@ -614,14 +619,6 @@ fn zip_files(media_folder: &Path, files: &[MediaEntry]) -> Result<Vec<u8>> {
|
||||
Ok(w.into_inner())
|
||||
}
|
||||
|
||||
async fn send_zip_data(client: &Client, skey: &str, data: Vec<u8>) -> Result<()> {
|
||||
let url = format!("{}/uploadChanges", SYNC_URL);
|
||||
|
||||
ankiweb_bytes_request(client, &url, data, skey).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct FinalizeRequest {
|
||||
local: u32,
|
||||
@ -656,7 +653,7 @@ mod test {
|
||||
|
||||
let mut mgr = MediaManager::new(&media_dir, &media_db)?;
|
||||
|
||||
sync_media(&mut mgr, hkey, progress).await?;
|
||||
sync_media(&mut mgr, hkey, progress, "https://sync.ankiweb.net/msync/").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user