diff --git a/docs/syncserver.md b/docs/syncserver.md new file mode 100644 index 000000000..6b2e5f54e --- /dev/null +++ b/docs/syncserver.md @@ -0,0 +1,103 @@ +# Local sync server + +A local sync server is bundled with Anki. If you cannot or do not wish to +use AnkiWeb, you can run the server on a machine on your local network. + +Things to be aware of: + +- Media syncing is not currently supported. You will either need to disable + syncing of sounds and images in the preferences screen, sync your media via + AnkiWeb, or use some other solution. +- AnkiMobile does not yet provide an option for using a local sync server, + so for now this will only be usable with the computer version of Anki, and + AnkiDroid. +- This code is partly new, and while it has had some testing, it's possible + something has been missed. Please make backups, and report any bugs you run + into. +- The server runs over an unencrypted HTTP connection and does not require + authentication, so it is only suitable for use on a private network. +- This is an advanced feature, targeted at users who are comfortable with + networking and the command line. If you use this, the expectation is you + can resolve any setup/network/firewall issues you run into yourself, and + use of this is entirely at your own risk. + +## From source + +If you run Anki from git, you can run a sync server with: + +``` +./scripts/runopt --syncserver +``` + +## From a packaged build + +From 2.1.39beta1+, the sync server is included in the packaged binaries. + +On Windows in a cmd.exe session: + +``` +"\program files\anki\anki-console.exe" --syncserver +``` + +Or MacOS, in Terminal.app: + +``` +/Applications/Anki.app/Contents/MacOS/AnkiMac --syncserver +``` + +Or Linux: + +``` +anki --syncserver +``` + +## Without Qt dependencies + +You can run the server without installing the GUI portion of Anki. Once Anki +2.1.39 is released, the following will work: + +``` +pip install anki[syncserver] +python -m anki.syncserver +``` + +## Server setup + +The server needs to store a copy of your collection in a folder. +By default it is ~/.syncserver; you can change this by defining +a `FOLDER` environmental variable. This should not be the same location +as your normal Anki data folder. + +You can also define `HOST` and `PORT`. + +## Client setup + +When the server starts, it will print the address it is listening on. +You need to set an environmental variable before starting your Anki +clients to tell them where to connect to. Eg: + +``` +set SYNC_ENDPOINT="http://10.0.0.5:8080/sync/" +anki +``` + +Currently any username and password will be accepted. If you wish to +keep using AnkiWeb for media, sync once with AnkiWeb first, then switch +to your local endpoint - collection syncs will be local, and media syncs +will continue to go to AnkiWeb. + +## Contributing + +Authentication shouldn't be too hard to add - login() and request() in +http_client.rs can be used as a reference. A PR that accepts a password in an +env var, and generates a stable hkey based on it would be welcome. + +Once that is done, basic multi-profile support could be implemented by moving +the col object into an array or dict, and fetching the relevant collection based +on the user's authentication. + +Because this server is bundled with Anki, simplicity is a design goal - it is +targeted at individual/family use, only makes use of Python libraries the GUI is +already using, and does not require a configuration file. PRs that deviate from +this are less likely to be merged, so please consider reaching out first if you +are thinking of starting work on a larger change. diff --git a/pylib/anki/BUILD.bazel b/pylib/anki/BUILD.bazel index 98fab4f67..58df9805a 100644 --- a/pylib/anki/BUILD.bazel +++ b/pylib/anki/BUILD.bazel @@ -96,6 +96,8 @@ py_library( requirement("distro"), requirement("protobuf"), requirement("requests"), + requirement("flask"), + requirement("waitress"), ] + orjson_if_available(), ) @@ -110,6 +112,12 @@ py_wheel( abi = "abi3", description_file = "wheel_description.txt", distribution = "anki", + extra_requires = { + "syncserver": [ + "flask", + "waitress", + ], + }, platform = select({ "//platforms:windows_x86_64": "win_amd64", "//platforms:macos_x86_64": "macosx_10_7_x86_64", diff --git a/pylib/anki/sync.py b/pylib/anki/sync.py index 6aef01114..9bd6409b5 100644 --- a/pylib/anki/sync.py +++ b/pylib/anki/sync.py @@ -1,5 +1,8 @@ # Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +# +# Legacy attributes some add-ons may be using +# from .httpclient import HttpClient diff --git a/pylib/anki/syncserver/__init__.py b/pylib/anki/syncserver/__init__.py new file mode 100644 index 000000000..1d2db82db --- /dev/null +++ b/pylib/anki/syncserver/__init__.py @@ -0,0 +1,193 @@ +# Copyright: Ankitects Pty Ltd and contributors +# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +# +# Please see /docs/syncserver.md +# + +from __future__ import annotations + +import gzip +import os +import socket +import sys +import time +from io import BytesIO +from tempfile import NamedTemporaryFile +from typing import Optional + +try: + import flask + from waitress.server import create_server +except ImportError as e: + print(e, "- to use the server, 'pip install anki[syncserver]'") + sys.exit(1) + + +from flask import Response + +from anki import Collection +from anki.backend_pb2 import SyncServerMethodIn + +Method = SyncServerMethodIn.Method # pylint: disable=no-member + +app = flask.Flask(__name__) +col: Collection +trace = os.getenv("TRACE") + + +def get_request_data() -> bytes: + buf = BytesIO() + flask.request.files["data"].save(buf) + buf.seek(0) + zip = gzip.GzipFile(mode="rb", fileobj=buf) + return zip.read() + + +def get_request_data_into_file() -> bytes: + "Returns the utf8 path to the resulting file." + # this could be optimized to stream the data into a file + # in the future + data = get_request_data() + tempobj = NamedTemporaryFile(dir=folder(), delete=False) + tempobj.write(data) + tempobj.close() + return tempobj.name.encode("utf8") + + +def handle_sync_request(method_str: str) -> Response: + method = get_method(method_str) + if method is None: + raise Exception(f"unknown method: {method_str}") + + if method == Method.FULL_UPLOAD: + data = get_request_data_into_file() + else: + data = get_request_data() + if trace: + print("-->", data) + + full = method in (Method.FULL_UPLOAD, Method.FULL_DOWNLOAD) + if full: + col.close_for_full_sync() + try: + outdata = col.backend.sync_server_method(method=method, data=data) + except Exception as e: + if method == Method.META: + # if parallel syncing requests come in, block them + print("exception in meta", e) + return flask.make_response("Conflict", 409) + else: + raise + finally: + if full: + after_full_sync() + + resp = None + if method == Method.FULL_UPLOAD: + # upload call expects a raw string literal returned + outdata = b"OK" + elif method == Method.FULL_DOWNLOAD: + path = outdata.decode("utf8") + + def stream_reply(): + with open(path, "rb") as f: + while chunk := f.read(16 * 1024): + yield chunk + os.unlink(path) + + resp = Response(stream_reply()) + else: + if trace: + print("<--", outdata) + + if not resp: + resp = flask.make_response(outdata) + resp.headers["Content-Type"] = "application/binary" + return resp + + +def after_full_sync(): + # the server methods do not reopen the collection after a full sync, + # so we need to + col.reopen(after_full_sync=False) + col.db.rollback() + + +def get_method( + method_str: str, +) -> Optional[SyncServerMethodIn.MethodValue]: # pylint: disable=no-member + s = method_str + if s == "hostKey": + return Method.HOST_KEY + elif s == "meta": + return Method.META + elif s == "start": + return Method.START + elif s == "applyGraves": + return Method.APPLY_GRAVES + elif s == "applyChanges": + return Method.APPLY_CHANGES + elif s == "chunk": + return Method.CHUNK + elif s == "applyChunk": + return Method.APPLY_CHUNK + elif s == "sanityCheck2": + return Method.SANITY_CHECK + elif s == "finish": + return Method.FINISH + elif s == "abort": + return Method.ABORT + elif s == "upload": + return Method.FULL_UPLOAD + elif s == "download": + return Method.FULL_DOWNLOAD + else: + return None + + +@app.route("/", methods=["POST"]) +def handle_request(pathin: str): + path = pathin + print(int(time.time()), flask.request.remote_addr, path) + + if path.startswith("sync/"): + return handle_sync_request(path.split("/", maxsplit=1)[1]) + + +def folder(): + folder = os.getenv("FOLDER", os.path.expanduser("~/.syncserver")) + if not os.path.exists(folder): + print("creating", folder) + os.mkdir(folder) + return folder + + +def col_path(): + return os.path.join(folder(), "collection.server.anki2") + + +def serve(): + global col + + col = Collection(col_path(), server=True) + # don't hold an outer transaction open + col.db.rollback() + host = os.getenv("HOST", "0.0.0.0") + port = int(os.getenv("PORT", "8080")) + + server = create_server( + app, + host=host, + port=port, + clear_untrusted_proxy_headers=True, + ) + + effective_port = server.effective_port # type: ignore + print(f"Sync server listening on http://{host}:{effective_port}/sync/") + if host == "0.0.0.0": + ip = socket.gethostbyname(socket.gethostname()) + print(f"Replace 0.0.0.0 with your machine's IP address (perhaps {ip})") + print( + "For more info, see https://github.com/ankitects/anki/blob/master/docs/syncserver.md" + ) + server.run() diff --git a/pylib/anki/syncserver/__main__.py b/pylib/anki/syncserver/__main__.py new file mode 100644 index 000000000..01e371ae6 --- /dev/null +++ b/pylib/anki/syncserver/__main__.py @@ -0,0 +1,6 @@ +# Copyright: Ankitects Pty Ltd and contributors +# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +from anki.syncserver import serve + +serve() diff --git a/qt/aqt/__init__.py b/qt/aqt/__init__.py index a2e10f122..61d3c91d1 100644 --- a/qt/aqt/__init__.py +++ b/qt/aqt/__init__.py @@ -316,9 +316,16 @@ def parseArgs(argv): parser.add_argument("-b", "--base", help="path to base folder", default="") parser.add_argument("-p", "--profile", help="profile name to load", default="") parser.add_argument("-l", "--lang", help="interface language (en, de, etc)") - parser.add_argument("-v", "--version", help="print the Anki version and exit") parser.add_argument( - "-s", "--safemode", help="disable add-ons and automatic syncing" + "-v", "--version", help="print the Anki version and exit", action="store_true" + ) + parser.add_argument( + "--safemode", help="disable add-ons and automatic syncing", action="store_true" + ) + parser.add_argument( + "--syncserver", + help="skip GUI and start a local sync server", + action="store_true", ) return parser.parse_known_args(argv[1:]) @@ -433,7 +440,12 @@ def _run(argv=None, exec=True): opts, args = parseArgs(argv) if opts.version: - print(f"Anki version '{appVersion}'") + print(f"Anki {appVersion}") + return + elif opts.syncserver: + from anki.syncserver import serve + + serve() return if PROFILE_CODE: diff --git a/rslib/backend.proto b/rslib/backend.proto index 0688e1c36..434241e1a 100644 --- a/rslib/backend.proto +++ b/rslib/backend.proto @@ -196,6 +196,7 @@ service BackendService { rpc SyncCollection(SyncAuth) returns (SyncCollectionOut); rpc FullUpload(SyncAuth) returns (Empty); rpc FullDownload(SyncAuth) returns (Empty); + rpc SyncServerMethod(SyncServerMethodIn) returns (Json); // translation/messages @@ -506,6 +507,7 @@ message SyncError { RESYNC_REQUIRED = 7; CLOCK_INCORRECT = 8; DATABASE_CHECK_REQUIRED = 9; + SYNC_NOT_STARTED = 10; } SyncErrorKind kind = 1; } @@ -1013,6 +1015,26 @@ message SyncAuth { uint32 host_number = 2; } +message SyncServerMethodIn { + enum Method { + HOST_KEY = 0; + META = 1; + START = 2; + APPLY_GRAVES = 3; + APPLY_CHANGES = 4; + CHUNK = 5; + APPLY_CHUNK = 6; + SANITY_CHECK = 7; + FINISH = 8; + ABORT = 9; + // caller must reopen after these two are called + FULL_UPLOAD = 10; + FULL_DOWNLOAD = 11; + } + Method method = 1; + bytes data = 2; +} + message RemoveNotesIn { repeated int64 note_ids = 1; repeated int64 card_ids = 2; diff --git a/rslib/src/backend/http_sync_server.rs b/rslib/src/backend/http_sync_server.rs index e69de29bb..49b6c57e5 100644 --- a/rslib/src/backend/http_sync_server.rs +++ b/rslib/src/backend/http_sync_server.rs @@ -0,0 +1,211 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use std::{path::PathBuf, sync::MutexGuard}; +use tokio::runtime::Runtime; + +use super::{Backend, BackendState}; +use crate::{ + err::SyncErrorKind, + prelude::*, + sync::{ + http::{ + ApplyChangesIn, ApplyChunkIn, ApplyGravesIn, HostKeyIn, HostKeyOut, MetaIn, + SanityCheckIn, StartIn, SyncRequest, + }, + Chunk, Graves, LocalServer, SanityCheckOut, SanityCheckStatus, SyncMeta, SyncServer, + UnchunkedChanges, SYNC_VERSION_MAX, SYNC_VERSION_MIN, + }, +}; + +impl Backend { + fn with_sync_server(&self, func: F) -> Result + where + F: FnOnce(&mut LocalServer) -> Result, + { + let mut state_guard = self.state.lock().unwrap(); + let out = + func( + state_guard + .http_sync_server + .as_mut() + .ok_or_else(|| AnkiError::SyncError { + kind: SyncErrorKind::SyncNotStarted, + info: Default::default(), + })?, + ); + if out.is_err() { + self.abort_and_restore_collection(Some(state_guard)) + } + out + } + + /// Gives out a dummy hkey - auth should be implemented at a higher layer. + fn host_key(&self, _input: HostKeyIn) -> Result { + Ok(HostKeyOut { + key: "unimplemented".into(), + }) + } + + fn meta(&self, input: MetaIn) -> Result { + if input.sync_version < SYNC_VERSION_MIN || input.sync_version > SYNC_VERSION_MAX { + return Ok(SyncMeta { + server_message: "Your Anki version is either too old, or too new.".into(), + should_continue: false, + ..Default::default() + }); + } + let server = self.col_into_server()?; + let mut rt = Runtime::new().unwrap(); + let meta = rt.block_on(server.meta())?; + self.server_into_col(server); + + Ok(meta) + } + + /// Takes the collection from the backend, places it into a server, and returns it. + fn col_into_server(&self) -> Result { + self.col + .lock() + .unwrap() + .take() + .map(LocalServer::new) + .ok_or(AnkiError::CollectionNotOpen) + } + + fn server_into_col(&self, server: LocalServer) { + let col = server.into_col(); + let mut col_guard = self.col.lock().unwrap(); + assert!(col_guard.replace(col).is_none()); + } + + fn take_server(&self, state_guard: Option>) -> Result { + let mut state_guard = state_guard.unwrap_or_else(|| self.state.lock().unwrap()); + state_guard + .http_sync_server + .take() + .ok_or_else(|| AnkiError::SyncError { + kind: SyncErrorKind::SyncNotStarted, + info: String::new(), + }) + } + + fn start(&self, input: StartIn) -> Result { + // place col into new server + let server = self.col_into_server()?; + let mut state_guard = self.state.lock().unwrap(); + assert!(state_guard.http_sync_server.replace(server).is_none()); + drop(state_guard); + + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.start(input.client_usn, input.local_is_newer)) + }) + } + + fn apply_graves(&self, input: ApplyGravesIn) -> Result<()> { + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.apply_graves(input.chunk)) + }) + } + + fn apply_changes(&self, input: ApplyChangesIn) -> Result { + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.apply_changes(input.changes)) + }) + } + + fn chunk(&self) -> Result { + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.chunk()) + }) + } + + fn apply_chunk(&self, input: ApplyChunkIn) -> Result<()> { + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.apply_chunk(input.chunk)) + }) + } + + fn sanity_check(&self, input: SanityCheckIn) -> Result { + self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.sanity_check(input.client)) + }) + .map(|out| { + if out.status != SanityCheckStatus::Ok { + // sanity check failures are an implicit abort + self.abort_and_restore_collection(None); + } + out + }) + } + + fn finish(&self) -> Result { + let out = self.with_sync_server(|server| { + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.finish()) + }); + self.server_into_col(self.take_server(None)?); + out + } + + fn abort(&self) -> Result<()> { + self.abort_and_restore_collection(None); + Ok(()) + } + + fn abort_and_restore_collection(&self, state_guard: Option>) { + if let Ok(mut server) = self.take_server(state_guard) { + let mut rt = Runtime::new().unwrap(); + // attempt to roll back + if let Err(abort_err) = rt.block_on(server.abort()) { + println!("abort failed: {:?}", abort_err); + } + self.server_into_col(server); + } + } + + /// Caller must re-open collection after this request. Provided file will be + /// consumed. + fn upload(&self, input: PathBuf) -> Result<()> { + // spool input into a file + let server = Box::new(self.col_into_server()?); + // then process upload + let mut rt = Runtime::new().unwrap(); + rt.block_on(server.full_upload(&input, true)) + } + + /// Caller must re-open collection after this request, and is responsible + /// for cleaning up the returned file. + fn download(&self) -> Result> { + let server = Box::new(self.col_into_server()?); + let mut rt = Runtime::new().unwrap(); + let file = rt.block_on(server.full_download())?; + let path = file.into_temp_path().keep()?; + Ok(path.to_str().expect("path was not in utf8").into()) + } + + pub(crate) fn sync_server_method_inner(&self, req: SyncRequest) -> Result> { + use serde_json::to_vec; + match req { + SyncRequest::HostKey(v) => to_vec(&self.host_key(v)?), + SyncRequest::Meta(v) => to_vec(&self.meta(v)?), + SyncRequest::Start(v) => to_vec(&self.start(v)?), + SyncRequest::ApplyGraves(v) => to_vec(&self.apply_graves(v)?), + SyncRequest::ApplyChanges(v) => to_vec(&self.apply_changes(v)?), + SyncRequest::Chunk => to_vec(&self.chunk()?), + SyncRequest::ApplyChunk(v) => to_vec(&self.apply_chunk(v)?), + SyncRequest::SanityCheck(v) => to_vec(&self.sanity_check(v)?), + SyncRequest::Finish => to_vec(&self.finish()?), + SyncRequest::Abort => to_vec(&self.abort()?), + SyncRequest::FullUpload(v) => to_vec(&self.upload(v)?), + SyncRequest::FullDownload => return self.download(), + } + .map_err(Into::into) + } +} diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index 6de2b6b79..62d2406cb 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -40,8 +40,9 @@ use crate::{ }, stats::studied_today, sync::{ - get_remote_sync_meta, sync_abort, sync_login, FullSyncProgress, NormalSyncProgress, - SyncActionRequired, SyncAuth, SyncMeta, SyncOutput, SyncStage, + get_remote_sync_meta, http::SyncRequest, sync_abort, sync_login, FullSyncProgress, + LocalServer, NormalSyncProgress, SyncActionRequired, SyncAuth, SyncMeta, SyncOutput, + SyncStage, }, template::RenderedNode, text::{escape_anki_wildcards, extract_av_tags, strip_av_tags, AVTag}, @@ -65,6 +66,7 @@ use std::{ use tokio::runtime::{self, Runtime}; mod dbproxy; +mod http_sync_server; struct ThrottlingProgressHandler { state: Arc>, @@ -111,6 +113,7 @@ pub struct Backend { struct BackendState { remote_sync_status: RemoteSyncStatus, media_sync_abort: Option, + http_sync_server: Option, } #[derive(Default, Debug)] @@ -191,6 +194,7 @@ impl std::convert::From for i32 { SyncErrorKind::DatabaseCheckRequired => V::DatabaseCheckRequired, SyncErrorKind::Other => V::Other, SyncErrorKind::ClockIncorrect => V::ClockIncorrect, + SyncErrorKind::SyncNotStarted => V::SyncNotStarted, }) as i32 } } @@ -1288,6 +1292,11 @@ impl BackendService for Backend { self.with_col(|col| col.before_upload().map(Into::into)) } + fn sync_server_method(&self, input: pb::SyncServerMethodIn) -> BackendResult { + let req = SyncRequest::from_method_and_data(input.method(), input.data)?; + self.sync_server_method_inner(req).map(Into::into) + } + // i18n/messages //------------------------------------------------------------------- diff --git a/rslib/src/err.rs b/rslib/src/err.rs index 87a1f7838..c417a635f 100644 --- a/rslib/src/err.rs +++ b/rslib/src/err.rs @@ -5,6 +5,7 @@ use crate::i18n::{tr_args, tr_strs, I18n, TR}; pub use failure::{Error, Fail}; use reqwest::StatusCode; use std::{io, str::Utf8Error}; +use tempfile::PathPersistError; pub type Result = std::result::Result; @@ -94,6 +95,8 @@ impl AnkiError { SyncErrorKind::ResyncRequired => i18n.tr(TR::SyncResyncRequired), SyncErrorKind::ClockIncorrect => i18n.tr(TR::SyncClockOff), SyncErrorKind::DatabaseCheckRequired => i18n.tr(TR::SyncSanityCheckFailed), + // server message + SyncErrorKind::SyncNotStarted => "sync not started".into(), } .into(), AnkiError::NetworkError { kind, info } => { @@ -229,6 +232,7 @@ pub enum SyncErrorKind { Other, ResyncRequired, DatabaseCheckRequired, + SyncNotStarted, } fn error_for_status_code(info: String, code: StatusCode) -> AnkiError { @@ -327,3 +331,11 @@ pub enum DBErrorKind { Utf8, Other, } + +impl From for AnkiError { + fn from(e: PathPersistError) -> Self { + AnkiError::IOError { + info: e.to_string(), + } + } +} diff --git a/rslib/src/sync/http.rs b/rslib/src/sync/http.rs index e7f88bbe1..b3f04d7ba 100644 --- a/rslib/src/sync/http.rs +++ b/rslib/src/sync/http.rs @@ -1,4 +1,10 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use std::{fs, path::PathBuf}; + use super::{Chunk, Graves, SanityCheckCounts, UnchunkedChanges}; +use crate::backend_proto::sync_server_method_in::Method; use crate::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] @@ -15,11 +21,15 @@ pub enum SyncRequest { SanityCheck(SanityCheckIn), Finish, Abort, + #[serde(rename = "upload")] + FullUpload(PathBuf), + #[serde(rename = "download")] + FullDownload, } impl SyncRequest { /// Return method name and payload bytes. - pub(crate) fn to_method_and_json(&self) -> Result<(&'static str, Vec)> { + pub(crate) fn into_method_and_data(self) -> Result<(&'static str, Vec)> { use serde_json::to_vec; Ok(match self { SyncRequest::HostKey(v) => ("hostKey", to_vec(&v)?), @@ -32,6 +42,32 @@ impl SyncRequest { SyncRequest::SanityCheck(v) => ("sanityCheck2", to_vec(&v)?), SyncRequest::Finish => ("finish", b"{}".to_vec()), SyncRequest::Abort => ("abort", b"{}".to_vec()), + SyncRequest::FullUpload(v) => { + // fixme: stream in the data instead, in a different call + ("upload", fs::read(&v)?) + } + SyncRequest::FullDownload => ("download", b"{}".to_vec()), + }) + } + + pub(crate) fn from_method_and_data(method: Method, data: Vec) -> Result { + use serde_json::from_slice; + Ok(match method { + Method::HostKey => SyncRequest::HostKey(from_slice(&data)?), + Method::Meta => SyncRequest::Meta(from_slice(&data)?), + Method::Start => SyncRequest::Start(from_slice(&data)?), + Method::ApplyGraves => SyncRequest::ApplyGraves(from_slice(&data)?), + Method::ApplyChanges => SyncRequest::ApplyChanges(from_slice(&data)?), + Method::Chunk => SyncRequest::Chunk, + Method::ApplyChunk => SyncRequest::ApplyChunk(from_slice(&data)?), + Method::SanityCheck => SyncRequest::SanityCheck(from_slice(&data)?), + Method::Finish => SyncRequest::Finish, + Method::Abort => SyncRequest::Abort, + Method::FullUpload => { + let path = PathBuf::from(String::from_utf8(data).expect("path was not in utf8")); + SyncRequest::FullUpload(path) + } + Method::FullDownload => SyncRequest::FullDownload, }) } } @@ -82,5 +118,4 @@ pub struct ApplyChunkIn { #[derive(Serialize, Deserialize, Debug)] pub struct SanityCheckIn { pub client: SanityCheckCounts, - pub full: bool, } diff --git a/rslib/src/sync/http_client.rs b/rslib/src/sync/http_client.rs index 7d779dd95..50550bc0d 100644 --- a/rslib/src/sync/http_client.rs +++ b/rslib/src/sync/http_client.rs @@ -1,7 +1,7 @@ // Copyright: Ankitects Pty Ltd and contributors // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html -use super::server::SyncServer; +use super::{server::SyncServer, SYNC_VERSION_MAX}; use super::{ Chunk, FullSyncProgress, Graves, SanityCheckCounts, SanityCheckOut, SyncMeta, UnchunkedChanges, }; @@ -28,8 +28,6 @@ use tempfile::NamedTempFile; // fixme: 100mb limit -static SYNC_VERSION: u8 = 10; - pub type FullSyncProgressFn = Box; pub struct HTTPSyncClient { @@ -67,10 +65,10 @@ impl Timeouts { impl SyncServer for HTTPSyncClient { async fn meta(&self) -> Result { let input = SyncRequest::Meta(MetaIn { - sync_version: SYNC_VERSION, + sync_version: SYNC_VERSION_MAX, client_version: sync_client_version().to_string(), }); - self.json_request(&input).await + self.json_request(input).await } async fn start(&mut self, client_usn: Usn, local_is_newer: bool) -> Result { @@ -78,42 +76,42 @@ impl SyncServer for HTTPSyncClient { client_usn, local_is_newer, }); - self.json_request(&input).await + self.json_request(input).await } async fn apply_graves(&mut self, chunk: Graves) -> Result<()> { let input = SyncRequest::ApplyGraves(ApplyGravesIn { chunk }); - self.json_request(&input).await + self.json_request(input).await } async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result { let input = SyncRequest::ApplyChanges(ApplyChangesIn { changes }); - self.json_request(&input).await + self.json_request(input).await } async fn chunk(&mut self) -> Result { let input = SyncRequest::Chunk; - self.json_request(&input).await + self.json_request(input).await } async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> { let input = SyncRequest::ApplyChunk(ApplyChunkIn { chunk }); - self.json_request(&input).await + self.json_request(input).await } async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result { - let input = SyncRequest::SanityCheck(SanityCheckIn { client, full: true }); - self.json_request(&input).await + let input = SyncRequest::SanityCheck(SanityCheckIn { client }); + self.json_request(input).await } async fn finish(&mut self) -> Result { let input = SyncRequest::Finish; - self.json_request(&input).await + self.json_request(input).await } async fn abort(&mut self) -> Result<()> { let input = SyncRequest::Abort; - self.json_request(&input).await + self.json_request(input).await } async fn full_upload(mut self: Box, col_path: &Path, _can_consume: bool) -> Result<()> { @@ -141,8 +139,8 @@ impl SyncServer for HTTPSyncClient { /// Download collection into a temporary file, returning it. /// Caller should persist the file in the correct path after checking it. /// Progress func must be set first. - async fn full_download(mut self: Box, folder: &Path) -> Result { - let mut temp_file = NamedTempFile::new_in(folder)?; + async fn full_download(mut self: Box) -> Result { + let mut temp_file = NamedTempFile::new()?; let (size, mut stream) = self.download_inner().await?; let mut progress = FullSyncProgress { transferred_bytes: 0, @@ -187,11 +185,11 @@ impl HTTPSyncClient { self.full_sync_progress_fn = func; } - async fn json_request(&self, req: &SyncRequest) -> Result + async fn json_request(&self, req: SyncRequest) -> Result where T: DeserializeOwned, { - let (method, req_json) = req.to_method_and_json()?; + let (method, req_json) = req.into_method_and_data()?; self.request_bytes(method, &req_json, false) .await? .json() @@ -242,7 +240,7 @@ impl HTTPSyncClient { username: username.into(), password: password.into(), }); - let output: HostKeyOut = self.json_request(&input).await?; + let output: HostKeyOut = self.json_request(input).await?; self.hkey = Some(output.key); Ok(()) @@ -403,13 +401,10 @@ mod test { // failed sanity check will have cleaned up; can't finish // syncer.finish().await?; - use tempfile::tempdir; - - let dir = tempdir()?; syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { println!("progress: {:?}", progress); }))); - let out_path = syncer.full_download(&dir.path()).await?; + let out_path = syncer.full_download().await?; let mut syncer = Box::new(HTTPSyncClient::new(None, 0)); syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { diff --git a/rslib/src/sync/mod.rs b/rslib/src/sync/mod.rs index 19037e48b..28e73f9b9 100644 --- a/rslib/src/sync/mod.rs +++ b/rslib/src/sync/mod.rs @@ -26,9 +26,12 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::Value; use serde_tuple::Serialize_tuple; -pub(crate) use server::SyncServer; +pub(crate) use server::{LocalServer, SyncServer}; use std::collections::HashMap; +pub static SYNC_VERSION_MIN: u8 = 7; +pub static SYNC_VERSION_MAX: u8 = 10; + #[derive(Default, Debug, Clone, Copy)] pub struct NormalSyncProgress { pub stage: SyncStage, @@ -51,23 +54,23 @@ impl Default for SyncStage { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct SyncMeta { #[serde(rename = "mod")] - modified: TimestampMillis, + pub modified: TimestampMillis, #[serde(rename = "scm")] - schema: TimestampMillis, - usn: Usn, + pub schema: TimestampMillis, + pub usn: Usn, #[serde(rename = "ts")] - current_time: TimestampSecs, + pub current_time: TimestampSecs, #[serde(rename = "msg")] - server_message: String, + pub server_message: String, #[serde(rename = "cont")] - should_continue: bool, + pub should_continue: bool, #[serde(rename = "hostNum")] - host_number: u32, + pub host_number: u32, #[serde(default)] - empty: bool, + pub empty: bool, } #[derive(Serialize, Deserialize, Debug, Default)] @@ -158,16 +161,16 @@ pub struct CardEntry { #[derive(Serialize, Deserialize, Debug)] pub struct SanityCheckOut { - status: SanityCheckStatus, + pub status: SanityCheckStatus, #[serde(rename = "c", default, deserialize_with = "default_on_invalid")] - client: Option, + pub client: Option, #[serde(rename = "s", default, deserialize_with = "default_on_invalid")] - server: Option, + pub server: Option, } #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "lowercase")] -enum SanityCheckStatus { +pub enum SanityCheckStatus { Ok, Bad, } @@ -666,9 +669,8 @@ impl Collection { pub(crate) async fn full_download_inner(self, server: Box) -> Result<()> { let col_path = self.col_path.clone(); - let folder = col_path.parent().unwrap(); self.close(false)?; - let out_file = server.full_download(folder).await?; + let out_file = server.full_download().await?; // check file ok let db = open_and_check_sqlite_file(out_file.path())?; db.execute_batch("update col set ls=mod")?; diff --git a/rslib/src/sync/server.rs b/rslib/src/sync/server.rs index aca568b80..13a2cbf0d 100644 --- a/rslib/src/sync/server.rs +++ b/rslib/src/sync/server.rs @@ -28,7 +28,7 @@ pub trait SyncServer { /// If `can_consume` is true, the local server will move or remove the file, instead /// creating a copy. The remote server ignores this argument. async fn full_upload(self: Box, col_path: &Path, can_consume: bool) -> Result<()>; - async fn full_download(self: Box, folder: &Path) -> Result; + async fn full_download(self: Box) -> Result; } pub struct LocalServer { @@ -58,6 +58,12 @@ impl LocalServer { server_chunk_ids: None, } } + + /// Consumes self and returns the stored collection. If a sync has begun, caller must ensure they + /// call .finish() or .abort() before calling this. + pub fn into_col(self) -> Collection { + self.col + } } #[async_trait(?Send)] @@ -168,14 +174,14 @@ impl SyncServer for LocalServer { fs::rename(col_path, &target_col_path).map_err(Into::into) } - async fn full_download(mut self: Box, output_folder: &Path) -> Result { + async fn full_download(mut self: Box) -> Result { // bump usn/mod & close self.col.transact(None, |col| col.storage.increment_usn())?; let col_path = self.col.col_path.clone(); self.col.close(true)?; // copy file and return path - let temp_file = NamedTempFile::new_in(output_folder)?; + let temp_file = NamedTempFile::new()?; fs::copy(&col_path, temp_file.path())?; Ok(temp_file)