add local sync server frontend

This commit is contained in:
Damien Elmes 2021-01-11 14:11:18 +10:00
parent fbd91b22f5
commit 633034b24d
14 changed files with 666 additions and 49 deletions

103
docs/syncserver.md Normal file
View File

@ -0,0 +1,103 @@
# Local sync server
A local sync server is bundled with Anki. If you cannot or do not wish to
use AnkiWeb, you can run the server on a machine on your local network.
Things to be aware of:
- Media syncing is not currently supported. You will either need to disable
syncing of sounds and images in the preferences screen, sync your media via
AnkiWeb, or use some other solution.
- AnkiMobile does not yet provide an option for using a local sync server,
so for now this will only be usable with the computer version of Anki, and
AnkiDroid.
- This code is partly new, and while it has had some testing, it's possible
something has been missed. Please make backups, and report any bugs you run
into.
- The server runs over an unencrypted HTTP connection and does not require
authentication, so it is only suitable for use on a private network.
- This is an advanced feature, targeted at users who are comfortable with
networking and the command line. If you use this, the expectation is you
can resolve any setup/network/firewall issues you run into yourself, and
use of this is entirely at your own risk.
## From source
If you run Anki from git, you can run a sync server with:
```
./scripts/runopt --syncserver
```
## From a packaged build
From 2.1.39beta1+, the sync server is included in the packaged binaries.
On Windows in a cmd.exe session:
```
"\program files\anki\anki-console.exe" --syncserver
```
Or MacOS, in Terminal.app:
```
/Applications/Anki.app/Contents/MacOS/AnkiMac --syncserver
```
Or Linux:
```
anki --syncserver
```
## Without Qt dependencies
You can run the server without installing the GUI portion of Anki. Once Anki
2.1.39 is released, the following will work:
```
pip install anki[syncserver]
python -m anki.syncserver
```
## Server setup
The server needs to store a copy of your collection in a folder.
By default it is ~/.syncserver; you can change this by defining
a `FOLDER` environmental variable. This should not be the same location
as your normal Anki data folder.
You can also define `HOST` and `PORT`.
## Client setup
When the server starts, it will print the address it is listening on.
You need to set an environmental variable before starting your Anki
clients to tell them where to connect to. Eg:
```
set SYNC_ENDPOINT="http://10.0.0.5:8080/sync/"
anki
```
Currently any username and password will be accepted. If you wish to
keep using AnkiWeb for media, sync once with AnkiWeb first, then switch
to your local endpoint - collection syncs will be local, and media syncs
will continue to go to AnkiWeb.
## Contributing
Authentication shouldn't be too hard to add - login() and request() in
http_client.rs can be used as a reference. A PR that accepts a password in an
env var, and generates a stable hkey based on it would be welcome.
Once that is done, basic multi-profile support could be implemented by moving
the col object into an array or dict, and fetching the relevant collection based
on the user's authentication.
Because this server is bundled with Anki, simplicity is a design goal - it is
targeted at individual/family use, only makes use of Python libraries the GUI is
already using, and does not require a configuration file. PRs that deviate from
this are less likely to be merged, so please consider reaching out first if you
are thinking of starting work on a larger change.

View File

@ -96,6 +96,8 @@ py_library(
requirement("distro"), requirement("distro"),
requirement("protobuf"), requirement("protobuf"),
requirement("requests"), requirement("requests"),
requirement("flask"),
requirement("waitress"),
] + orjson_if_available(), ] + orjson_if_available(),
) )
@ -110,6 +112,12 @@ py_wheel(
abi = "abi3", abi = "abi3",
description_file = "wheel_description.txt", description_file = "wheel_description.txt",
distribution = "anki", distribution = "anki",
extra_requires = {
"syncserver": [
"flask",
"waitress",
],
},
platform = select({ platform = select({
"//platforms:windows_x86_64": "win_amd64", "//platforms:windows_x86_64": "win_amd64",
"//platforms:macos_x86_64": "macosx_10_7_x86_64", "//platforms:macos_x86_64": "macosx_10_7_x86_64",

View File

@ -1,5 +1,8 @@
# Copyright: Ankitects Pty Ltd and contributors # Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
#
# Legacy attributes some add-ons may be using
#
from .httpclient import HttpClient from .httpclient import HttpClient

View File

@ -0,0 +1,193 @@
# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
#
# Please see /docs/syncserver.md
#
from __future__ import annotations
import gzip
import os
import socket
import sys
import time
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import Optional
try:
import flask
from waitress.server import create_server
except ImportError as e:
print(e, "- to use the server, 'pip install anki[syncserver]'")
sys.exit(1)
from flask import Response
from anki import Collection
from anki.backend_pb2 import SyncServerMethodIn
Method = SyncServerMethodIn.Method # pylint: disable=no-member
app = flask.Flask(__name__)
col: Collection
trace = os.getenv("TRACE")
def get_request_data() -> bytes:
buf = BytesIO()
flask.request.files["data"].save(buf)
buf.seek(0)
zip = gzip.GzipFile(mode="rb", fileobj=buf)
return zip.read()
def get_request_data_into_file() -> bytes:
"Returns the utf8 path to the resulting file."
# this could be optimized to stream the data into a file
# in the future
data = get_request_data()
tempobj = NamedTemporaryFile(dir=folder(), delete=False)
tempobj.write(data)
tempobj.close()
return tempobj.name.encode("utf8")
def handle_sync_request(method_str: str) -> Response:
method = get_method(method_str)
if method is None:
raise Exception(f"unknown method: {method_str}")
if method == Method.FULL_UPLOAD:
data = get_request_data_into_file()
else:
data = get_request_data()
if trace:
print("-->", data)
full = method in (Method.FULL_UPLOAD, Method.FULL_DOWNLOAD)
if full:
col.close_for_full_sync()
try:
outdata = col.backend.sync_server_method(method=method, data=data)
except Exception as e:
if method == Method.META:
# if parallel syncing requests come in, block them
print("exception in meta", e)
return flask.make_response("Conflict", 409)
else:
raise
finally:
if full:
after_full_sync()
resp = None
if method == Method.FULL_UPLOAD:
# upload call expects a raw string literal returned
outdata = b"OK"
elif method == Method.FULL_DOWNLOAD:
path = outdata.decode("utf8")
def stream_reply():
with open(path, "rb") as f:
while chunk := f.read(16 * 1024):
yield chunk
os.unlink(path)
resp = Response(stream_reply())
else:
if trace:
print("<--", outdata)
if not resp:
resp = flask.make_response(outdata)
resp.headers["Content-Type"] = "application/binary"
return resp
def after_full_sync():
# the server methods do not reopen the collection after a full sync,
# so we need to
col.reopen(after_full_sync=False)
col.db.rollback()
def get_method(
method_str: str,
) -> Optional[SyncServerMethodIn.MethodValue]: # pylint: disable=no-member
s = method_str
if s == "hostKey":
return Method.HOST_KEY
elif s == "meta":
return Method.META
elif s == "start":
return Method.START
elif s == "applyGraves":
return Method.APPLY_GRAVES
elif s == "applyChanges":
return Method.APPLY_CHANGES
elif s == "chunk":
return Method.CHUNK
elif s == "applyChunk":
return Method.APPLY_CHUNK
elif s == "sanityCheck2":
return Method.SANITY_CHECK
elif s == "finish":
return Method.FINISH
elif s == "abort":
return Method.ABORT
elif s == "upload":
return Method.FULL_UPLOAD
elif s == "download":
return Method.FULL_DOWNLOAD
else:
return None
@app.route("/<path:pathin>", methods=["POST"])
def handle_request(pathin: str):
path = pathin
print(int(time.time()), flask.request.remote_addr, path)
if path.startswith("sync/"):
return handle_sync_request(path.split("/", maxsplit=1)[1])
def folder():
folder = os.getenv("FOLDER", os.path.expanduser("~/.syncserver"))
if not os.path.exists(folder):
print("creating", folder)
os.mkdir(folder)
return folder
def col_path():
return os.path.join(folder(), "collection.server.anki2")
def serve():
global col
col = Collection(col_path(), server=True)
# don't hold an outer transaction open
col.db.rollback()
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8080"))
server = create_server(
app,
host=host,
port=port,
clear_untrusted_proxy_headers=True,
)
effective_port = server.effective_port # type: ignore
print(f"Sync server listening on http://{host}:{effective_port}/sync/")
if host == "0.0.0.0":
ip = socket.gethostbyname(socket.gethostname())
print(f"Replace 0.0.0.0 with your machine's IP address (perhaps {ip})")
print(
"For more info, see https://github.com/ankitects/anki/blob/master/docs/syncserver.md"
)
server.run()

View File

@ -0,0 +1,6 @@
# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from anki.syncserver import serve
serve()

View File

@ -316,9 +316,16 @@ def parseArgs(argv):
parser.add_argument("-b", "--base", help="path to base folder", default="") parser.add_argument("-b", "--base", help="path to base folder", default="")
parser.add_argument("-p", "--profile", help="profile name to load", default="") parser.add_argument("-p", "--profile", help="profile name to load", default="")
parser.add_argument("-l", "--lang", help="interface language (en, de, etc)") parser.add_argument("-l", "--lang", help="interface language (en, de, etc)")
parser.add_argument("-v", "--version", help="print the Anki version and exit")
parser.add_argument( parser.add_argument(
"-s", "--safemode", help="disable add-ons and automatic syncing" "-v", "--version", help="print the Anki version and exit", action="store_true"
)
parser.add_argument(
"--safemode", help="disable add-ons and automatic syncing", action="store_true"
)
parser.add_argument(
"--syncserver",
help="skip GUI and start a local sync server",
action="store_true",
) )
return parser.parse_known_args(argv[1:]) return parser.parse_known_args(argv[1:])
@ -433,7 +440,12 @@ def _run(argv=None, exec=True):
opts, args = parseArgs(argv) opts, args = parseArgs(argv)
if opts.version: if opts.version:
print(f"Anki version '{appVersion}'") print(f"Anki {appVersion}")
return
elif opts.syncserver:
from anki.syncserver import serve
serve()
return return
if PROFILE_CODE: if PROFILE_CODE:

View File

@ -196,6 +196,7 @@ service BackendService {
rpc SyncCollection(SyncAuth) returns (SyncCollectionOut); rpc SyncCollection(SyncAuth) returns (SyncCollectionOut);
rpc FullUpload(SyncAuth) returns (Empty); rpc FullUpload(SyncAuth) returns (Empty);
rpc FullDownload(SyncAuth) returns (Empty); rpc FullDownload(SyncAuth) returns (Empty);
rpc SyncServerMethod(SyncServerMethodIn) returns (Json);
// translation/messages // translation/messages
@ -506,6 +507,7 @@ message SyncError {
RESYNC_REQUIRED = 7; RESYNC_REQUIRED = 7;
CLOCK_INCORRECT = 8; CLOCK_INCORRECT = 8;
DATABASE_CHECK_REQUIRED = 9; DATABASE_CHECK_REQUIRED = 9;
SYNC_NOT_STARTED = 10;
} }
SyncErrorKind kind = 1; SyncErrorKind kind = 1;
} }
@ -1013,6 +1015,26 @@ message SyncAuth {
uint32 host_number = 2; uint32 host_number = 2;
} }
message SyncServerMethodIn {
enum Method {
HOST_KEY = 0;
META = 1;
START = 2;
APPLY_GRAVES = 3;
APPLY_CHANGES = 4;
CHUNK = 5;
APPLY_CHUNK = 6;
SANITY_CHECK = 7;
FINISH = 8;
ABORT = 9;
// caller must reopen after these two are called
FULL_UPLOAD = 10;
FULL_DOWNLOAD = 11;
}
Method method = 1;
bytes data = 2;
}
message RemoveNotesIn { message RemoveNotesIn {
repeated int64 note_ids = 1; repeated int64 note_ids = 1;
repeated int64 card_ids = 2; repeated int64 card_ids = 2;

View File

@ -0,0 +1,211 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use std::{path::PathBuf, sync::MutexGuard};
use tokio::runtime::Runtime;
use super::{Backend, BackendState};
use crate::{
err::SyncErrorKind,
prelude::*,
sync::{
http::{
ApplyChangesIn, ApplyChunkIn, ApplyGravesIn, HostKeyIn, HostKeyOut, MetaIn,
SanityCheckIn, StartIn, SyncRequest,
},
Chunk, Graves, LocalServer, SanityCheckOut, SanityCheckStatus, SyncMeta, SyncServer,
UnchunkedChanges, SYNC_VERSION_MAX, SYNC_VERSION_MIN,
},
};
impl Backend {
fn with_sync_server<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&mut LocalServer) -> Result<T>,
{
let mut state_guard = self.state.lock().unwrap();
let out =
func(
state_guard
.http_sync_server
.as_mut()
.ok_or_else(|| AnkiError::SyncError {
kind: SyncErrorKind::SyncNotStarted,
info: Default::default(),
})?,
);
if out.is_err() {
self.abort_and_restore_collection(Some(state_guard))
}
out
}
/// Gives out a dummy hkey - auth should be implemented at a higher layer.
fn host_key(&self, _input: HostKeyIn) -> Result<HostKeyOut> {
Ok(HostKeyOut {
key: "unimplemented".into(),
})
}
fn meta(&self, input: MetaIn) -> Result<SyncMeta> {
if input.sync_version < SYNC_VERSION_MIN || input.sync_version > SYNC_VERSION_MAX {
return Ok(SyncMeta {
server_message: "Your Anki version is either too old, or too new.".into(),
should_continue: false,
..Default::default()
});
}
let server = self.col_into_server()?;
let mut rt = Runtime::new().unwrap();
let meta = rt.block_on(server.meta())?;
self.server_into_col(server);
Ok(meta)
}
/// Takes the collection from the backend, places it into a server, and returns it.
fn col_into_server(&self) -> Result<LocalServer> {
self.col
.lock()
.unwrap()
.take()
.map(LocalServer::new)
.ok_or(AnkiError::CollectionNotOpen)
}
fn server_into_col(&self, server: LocalServer) {
let col = server.into_col();
let mut col_guard = self.col.lock().unwrap();
assert!(col_guard.replace(col).is_none());
}
fn take_server(&self, state_guard: Option<MutexGuard<BackendState>>) -> Result<LocalServer> {
let mut state_guard = state_guard.unwrap_or_else(|| self.state.lock().unwrap());
state_guard
.http_sync_server
.take()
.ok_or_else(|| AnkiError::SyncError {
kind: SyncErrorKind::SyncNotStarted,
info: String::new(),
})
}
fn start(&self, input: StartIn) -> Result<Graves> {
// place col into new server
let server = self.col_into_server()?;
let mut state_guard = self.state.lock().unwrap();
assert!(state_guard.http_sync_server.replace(server).is_none());
drop(state_guard);
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.start(input.client_usn, input.local_is_newer))
})
}
fn apply_graves(&self, input: ApplyGravesIn) -> Result<()> {
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.apply_graves(input.chunk))
})
}
fn apply_changes(&self, input: ApplyChangesIn) -> Result<UnchunkedChanges> {
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.apply_changes(input.changes))
})
}
fn chunk(&self) -> Result<Chunk> {
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.chunk())
})
}
fn apply_chunk(&self, input: ApplyChunkIn) -> Result<()> {
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.apply_chunk(input.chunk))
})
}
fn sanity_check(&self, input: SanityCheckIn) -> Result<SanityCheckOut> {
self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.sanity_check(input.client))
})
.map(|out| {
if out.status != SanityCheckStatus::Ok {
// sanity check failures are an implicit abort
self.abort_and_restore_collection(None);
}
out
})
}
fn finish(&self) -> Result<TimestampMillis> {
let out = self.with_sync_server(|server| {
let mut rt = Runtime::new().unwrap();
rt.block_on(server.finish())
});
self.server_into_col(self.take_server(None)?);
out
}
fn abort(&self) -> Result<()> {
self.abort_and_restore_collection(None);
Ok(())
}
fn abort_and_restore_collection(&self, state_guard: Option<MutexGuard<BackendState>>) {
if let Ok(mut server) = self.take_server(state_guard) {
let mut rt = Runtime::new().unwrap();
// attempt to roll back
if let Err(abort_err) = rt.block_on(server.abort()) {
println!("abort failed: {:?}", abort_err);
}
self.server_into_col(server);
}
}
/// Caller must re-open collection after this request. Provided file will be
/// consumed.
fn upload(&self, input: PathBuf) -> Result<()> {
// spool input into a file
let server = Box::new(self.col_into_server()?);
// then process upload
let mut rt = Runtime::new().unwrap();
rt.block_on(server.full_upload(&input, true))
}
/// Caller must re-open collection after this request, and is responsible
/// for cleaning up the returned file.
fn download(&self) -> Result<Vec<u8>> {
let server = Box::new(self.col_into_server()?);
let mut rt = Runtime::new().unwrap();
let file = rt.block_on(server.full_download())?;
let path = file.into_temp_path().keep()?;
Ok(path.to_str().expect("path was not in utf8").into())
}
pub(crate) fn sync_server_method_inner(&self, req: SyncRequest) -> Result<Vec<u8>> {
use serde_json::to_vec;
match req {
SyncRequest::HostKey(v) => to_vec(&self.host_key(v)?),
SyncRequest::Meta(v) => to_vec(&self.meta(v)?),
SyncRequest::Start(v) => to_vec(&self.start(v)?),
SyncRequest::ApplyGraves(v) => to_vec(&self.apply_graves(v)?),
SyncRequest::ApplyChanges(v) => to_vec(&self.apply_changes(v)?),
SyncRequest::Chunk => to_vec(&self.chunk()?),
SyncRequest::ApplyChunk(v) => to_vec(&self.apply_chunk(v)?),
SyncRequest::SanityCheck(v) => to_vec(&self.sanity_check(v)?),
SyncRequest::Finish => to_vec(&self.finish()?),
SyncRequest::Abort => to_vec(&self.abort()?),
SyncRequest::FullUpload(v) => to_vec(&self.upload(v)?),
SyncRequest::FullDownload => return self.download(),
}
.map_err(Into::into)
}
}

View File

@ -40,8 +40,9 @@ use crate::{
}, },
stats::studied_today, stats::studied_today,
sync::{ sync::{
get_remote_sync_meta, sync_abort, sync_login, FullSyncProgress, NormalSyncProgress, get_remote_sync_meta, http::SyncRequest, sync_abort, sync_login, FullSyncProgress,
SyncActionRequired, SyncAuth, SyncMeta, SyncOutput, SyncStage, LocalServer, NormalSyncProgress, SyncActionRequired, SyncAuth, SyncMeta, SyncOutput,
SyncStage,
}, },
template::RenderedNode, template::RenderedNode,
text::{escape_anki_wildcards, extract_av_tags, strip_av_tags, AVTag}, text::{escape_anki_wildcards, extract_av_tags, strip_av_tags, AVTag},
@ -65,6 +66,7 @@ use std::{
use tokio::runtime::{self, Runtime}; use tokio::runtime::{self, Runtime};
mod dbproxy; mod dbproxy;
mod http_sync_server;
struct ThrottlingProgressHandler { struct ThrottlingProgressHandler {
state: Arc<Mutex<ProgressState>>, state: Arc<Mutex<ProgressState>>,
@ -111,6 +113,7 @@ pub struct Backend {
struct BackendState { struct BackendState {
remote_sync_status: RemoteSyncStatus, remote_sync_status: RemoteSyncStatus,
media_sync_abort: Option<AbortHandle>, media_sync_abort: Option<AbortHandle>,
http_sync_server: Option<LocalServer>,
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -191,6 +194,7 @@ impl std::convert::From<SyncErrorKind> for i32 {
SyncErrorKind::DatabaseCheckRequired => V::DatabaseCheckRequired, SyncErrorKind::DatabaseCheckRequired => V::DatabaseCheckRequired,
SyncErrorKind::Other => V::Other, SyncErrorKind::Other => V::Other,
SyncErrorKind::ClockIncorrect => V::ClockIncorrect, SyncErrorKind::ClockIncorrect => V::ClockIncorrect,
SyncErrorKind::SyncNotStarted => V::SyncNotStarted,
}) as i32 }) as i32
} }
} }
@ -1288,6 +1292,11 @@ impl BackendService for Backend {
self.with_col(|col| col.before_upload().map(Into::into)) self.with_col(|col| col.before_upload().map(Into::into))
} }
fn sync_server_method(&self, input: pb::SyncServerMethodIn) -> BackendResult<pb::Json> {
let req = SyncRequest::from_method_and_data(input.method(), input.data)?;
self.sync_server_method_inner(req).map(Into::into)
}
// i18n/messages // i18n/messages
//------------------------------------------------------------------- //-------------------------------------------------------------------

View File

@ -5,6 +5,7 @@ use crate::i18n::{tr_args, tr_strs, I18n, TR};
pub use failure::{Error, Fail}; pub use failure::{Error, Fail};
use reqwest::StatusCode; use reqwest::StatusCode;
use std::{io, str::Utf8Error}; use std::{io, str::Utf8Error};
use tempfile::PathPersistError;
pub type Result<T> = std::result::Result<T, AnkiError>; pub type Result<T> = std::result::Result<T, AnkiError>;
@ -94,6 +95,8 @@ impl AnkiError {
SyncErrorKind::ResyncRequired => i18n.tr(TR::SyncResyncRequired), SyncErrorKind::ResyncRequired => i18n.tr(TR::SyncResyncRequired),
SyncErrorKind::ClockIncorrect => i18n.tr(TR::SyncClockOff), SyncErrorKind::ClockIncorrect => i18n.tr(TR::SyncClockOff),
SyncErrorKind::DatabaseCheckRequired => i18n.tr(TR::SyncSanityCheckFailed), SyncErrorKind::DatabaseCheckRequired => i18n.tr(TR::SyncSanityCheckFailed),
// server message
SyncErrorKind::SyncNotStarted => "sync not started".into(),
} }
.into(), .into(),
AnkiError::NetworkError { kind, info } => { AnkiError::NetworkError { kind, info } => {
@ -229,6 +232,7 @@ pub enum SyncErrorKind {
Other, Other,
ResyncRequired, ResyncRequired,
DatabaseCheckRequired, DatabaseCheckRequired,
SyncNotStarted,
} }
fn error_for_status_code(info: String, code: StatusCode) -> AnkiError { fn error_for_status_code(info: String, code: StatusCode) -> AnkiError {
@ -327,3 +331,11 @@ pub enum DBErrorKind {
Utf8, Utf8,
Other, Other,
} }
impl From<PathPersistError> for AnkiError {
fn from(e: PathPersistError) -> Self {
AnkiError::IOError {
info: e.to_string(),
}
}
}

View File

@ -1,4 +1,10 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use std::{fs, path::PathBuf};
use super::{Chunk, Graves, SanityCheckCounts, UnchunkedChanges}; use super::{Chunk, Graves, SanityCheckCounts, UnchunkedChanges};
use crate::backend_proto::sync_server_method_in::Method;
use crate::prelude::*; use crate::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -15,11 +21,15 @@ pub enum SyncRequest {
SanityCheck(SanityCheckIn), SanityCheck(SanityCheckIn),
Finish, Finish,
Abort, Abort,
#[serde(rename = "upload")]
FullUpload(PathBuf),
#[serde(rename = "download")]
FullDownload,
} }
impl SyncRequest { impl SyncRequest {
/// Return method name and payload bytes. /// Return method name and payload bytes.
pub(crate) fn to_method_and_json(&self) -> Result<(&'static str, Vec<u8>)> { pub(crate) fn into_method_and_data(self) -> Result<(&'static str, Vec<u8>)> {
use serde_json::to_vec; use serde_json::to_vec;
Ok(match self { Ok(match self {
SyncRequest::HostKey(v) => ("hostKey", to_vec(&v)?), SyncRequest::HostKey(v) => ("hostKey", to_vec(&v)?),
@ -32,6 +42,32 @@ impl SyncRequest {
SyncRequest::SanityCheck(v) => ("sanityCheck2", to_vec(&v)?), SyncRequest::SanityCheck(v) => ("sanityCheck2", to_vec(&v)?),
SyncRequest::Finish => ("finish", b"{}".to_vec()), SyncRequest::Finish => ("finish", b"{}".to_vec()),
SyncRequest::Abort => ("abort", b"{}".to_vec()), SyncRequest::Abort => ("abort", b"{}".to_vec()),
SyncRequest::FullUpload(v) => {
// fixme: stream in the data instead, in a different call
("upload", fs::read(&v)?)
}
SyncRequest::FullDownload => ("download", b"{}".to_vec()),
})
}
pub(crate) fn from_method_and_data(method: Method, data: Vec<u8>) -> Result<Self> {
use serde_json::from_slice;
Ok(match method {
Method::HostKey => SyncRequest::HostKey(from_slice(&data)?),
Method::Meta => SyncRequest::Meta(from_slice(&data)?),
Method::Start => SyncRequest::Start(from_slice(&data)?),
Method::ApplyGraves => SyncRequest::ApplyGraves(from_slice(&data)?),
Method::ApplyChanges => SyncRequest::ApplyChanges(from_slice(&data)?),
Method::Chunk => SyncRequest::Chunk,
Method::ApplyChunk => SyncRequest::ApplyChunk(from_slice(&data)?),
Method::SanityCheck => SyncRequest::SanityCheck(from_slice(&data)?),
Method::Finish => SyncRequest::Finish,
Method::Abort => SyncRequest::Abort,
Method::FullUpload => {
let path = PathBuf::from(String::from_utf8(data).expect("path was not in utf8"));
SyncRequest::FullUpload(path)
}
Method::FullDownload => SyncRequest::FullDownload,
}) })
} }
} }
@ -82,5 +118,4 @@ pub struct ApplyChunkIn {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct SanityCheckIn { pub struct SanityCheckIn {
pub client: SanityCheckCounts, pub client: SanityCheckCounts,
pub full: bool,
} }

View File

@ -1,7 +1,7 @@
// Copyright: Ankitects Pty Ltd and contributors // Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use super::server::SyncServer; use super::{server::SyncServer, SYNC_VERSION_MAX};
use super::{ use super::{
Chunk, FullSyncProgress, Graves, SanityCheckCounts, SanityCheckOut, SyncMeta, UnchunkedChanges, Chunk, FullSyncProgress, Graves, SanityCheckCounts, SanityCheckOut, SyncMeta, UnchunkedChanges,
}; };
@ -28,8 +28,6 @@ use tempfile::NamedTempFile;
// fixme: 100mb limit // fixme: 100mb limit
static SYNC_VERSION: u8 = 10;
pub type FullSyncProgressFn = Box<dyn FnMut(FullSyncProgress, bool) + Send + Sync + 'static>; pub type FullSyncProgressFn = Box<dyn FnMut(FullSyncProgress, bool) + Send + Sync + 'static>;
pub struct HTTPSyncClient { pub struct HTTPSyncClient {
@ -67,10 +65,10 @@ impl Timeouts {
impl SyncServer for HTTPSyncClient { impl SyncServer for HTTPSyncClient {
async fn meta(&self) -> Result<SyncMeta> { async fn meta(&self) -> Result<SyncMeta> {
let input = SyncRequest::Meta(MetaIn { let input = SyncRequest::Meta(MetaIn {
sync_version: SYNC_VERSION, sync_version: SYNC_VERSION_MAX,
client_version: sync_client_version().to_string(), client_version: sync_client_version().to_string(),
}); });
self.json_request(&input).await self.json_request(input).await
} }
async fn start(&mut self, client_usn: Usn, local_is_newer: bool) -> Result<Graves> { async fn start(&mut self, client_usn: Usn, local_is_newer: bool) -> Result<Graves> {
@ -78,42 +76,42 @@ impl SyncServer for HTTPSyncClient {
client_usn, client_usn,
local_is_newer, local_is_newer,
}); });
self.json_request(&input).await self.json_request(input).await
} }
async fn apply_graves(&mut self, chunk: Graves) -> Result<()> { async fn apply_graves(&mut self, chunk: Graves) -> Result<()> {
let input = SyncRequest::ApplyGraves(ApplyGravesIn { chunk }); let input = SyncRequest::ApplyGraves(ApplyGravesIn { chunk });
self.json_request(&input).await self.json_request(input).await
} }
async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result<UnchunkedChanges> { async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result<UnchunkedChanges> {
let input = SyncRequest::ApplyChanges(ApplyChangesIn { changes }); let input = SyncRequest::ApplyChanges(ApplyChangesIn { changes });
self.json_request(&input).await self.json_request(input).await
} }
async fn chunk(&mut self) -> Result<Chunk> { async fn chunk(&mut self) -> Result<Chunk> {
let input = SyncRequest::Chunk; let input = SyncRequest::Chunk;
self.json_request(&input).await self.json_request(input).await
} }
async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> { async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> {
let input = SyncRequest::ApplyChunk(ApplyChunkIn { chunk }); let input = SyncRequest::ApplyChunk(ApplyChunkIn { chunk });
self.json_request(&input).await self.json_request(input).await
} }
async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> { async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> {
let input = SyncRequest::SanityCheck(SanityCheckIn { client, full: true }); let input = SyncRequest::SanityCheck(SanityCheckIn { client });
self.json_request(&input).await self.json_request(input).await
} }
async fn finish(&mut self) -> Result<TimestampMillis> { async fn finish(&mut self) -> Result<TimestampMillis> {
let input = SyncRequest::Finish; let input = SyncRequest::Finish;
self.json_request(&input).await self.json_request(input).await
} }
async fn abort(&mut self) -> Result<()> { async fn abort(&mut self) -> Result<()> {
let input = SyncRequest::Abort; let input = SyncRequest::Abort;
self.json_request(&input).await self.json_request(input).await
} }
async fn full_upload(mut self: Box<Self>, col_path: &Path, _can_consume: bool) -> Result<()> { async fn full_upload(mut self: Box<Self>, col_path: &Path, _can_consume: bool) -> Result<()> {
@ -141,8 +139,8 @@ impl SyncServer for HTTPSyncClient {
/// Download collection into a temporary file, returning it. /// Download collection into a temporary file, returning it.
/// Caller should persist the file in the correct path after checking it. /// Caller should persist the file in the correct path after checking it.
/// Progress func must be set first. /// Progress func must be set first.
async fn full_download(mut self: Box<Self>, folder: &Path) -> Result<NamedTempFile> { async fn full_download(mut self: Box<Self>) -> Result<NamedTempFile> {
let mut temp_file = NamedTempFile::new_in(folder)?; let mut temp_file = NamedTempFile::new()?;
let (size, mut stream) = self.download_inner().await?; let (size, mut stream) = self.download_inner().await?;
let mut progress = FullSyncProgress { let mut progress = FullSyncProgress {
transferred_bytes: 0, transferred_bytes: 0,
@ -187,11 +185,11 @@ impl HTTPSyncClient {
self.full_sync_progress_fn = func; self.full_sync_progress_fn = func;
} }
async fn json_request<T>(&self, req: &SyncRequest) -> Result<T> async fn json_request<T>(&self, req: SyncRequest) -> Result<T>
where where
T: DeserializeOwned, T: DeserializeOwned,
{ {
let (method, req_json) = req.to_method_and_json()?; let (method, req_json) = req.into_method_and_data()?;
self.request_bytes(method, &req_json, false) self.request_bytes(method, &req_json, false)
.await? .await?
.json() .json()
@ -242,7 +240,7 @@ impl HTTPSyncClient {
username: username.into(), username: username.into(),
password: password.into(), password: password.into(),
}); });
let output: HostKeyOut = self.json_request(&input).await?; let output: HostKeyOut = self.json_request(input).await?;
self.hkey = Some(output.key); self.hkey = Some(output.key);
Ok(()) Ok(())
@ -403,13 +401,10 @@ mod test {
// failed sanity check will have cleaned up; can't finish // failed sanity check will have cleaned up; can't finish
// syncer.finish().await?; // syncer.finish().await?;
use tempfile::tempdir;
let dir = tempdir()?;
syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| {
println!("progress: {:?}", progress); println!("progress: {:?}", progress);
}))); })));
let out_path = syncer.full_download(&dir.path()).await?; let out_path = syncer.full_download().await?;
let mut syncer = Box::new(HTTPSyncClient::new(None, 0)); let mut syncer = Box::new(HTTPSyncClient::new(None, 0));
syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| {

View File

@ -26,9 +26,12 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use serde_tuple::Serialize_tuple; use serde_tuple::Serialize_tuple;
pub(crate) use server::SyncServer; pub(crate) use server::{LocalServer, SyncServer};
use std::collections::HashMap; use std::collections::HashMap;
pub static SYNC_VERSION_MIN: u8 = 7;
pub static SYNC_VERSION_MAX: u8 = 10;
#[derive(Default, Debug, Clone, Copy)] #[derive(Default, Debug, Clone, Copy)]
pub struct NormalSyncProgress { pub struct NormalSyncProgress {
pub stage: SyncStage, pub stage: SyncStage,
@ -51,23 +54,23 @@ impl Default for SyncStage {
} }
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Default)]
pub struct SyncMeta { pub struct SyncMeta {
#[serde(rename = "mod")] #[serde(rename = "mod")]
modified: TimestampMillis, pub modified: TimestampMillis,
#[serde(rename = "scm")] #[serde(rename = "scm")]
schema: TimestampMillis, pub schema: TimestampMillis,
usn: Usn, pub usn: Usn,
#[serde(rename = "ts")] #[serde(rename = "ts")]
current_time: TimestampSecs, pub current_time: TimestampSecs,
#[serde(rename = "msg")] #[serde(rename = "msg")]
server_message: String, pub server_message: String,
#[serde(rename = "cont")] #[serde(rename = "cont")]
should_continue: bool, pub should_continue: bool,
#[serde(rename = "hostNum")] #[serde(rename = "hostNum")]
host_number: u32, pub host_number: u32,
#[serde(default)] #[serde(default)]
empty: bool, pub empty: bool,
} }
#[derive(Serialize, Deserialize, Debug, Default)] #[derive(Serialize, Deserialize, Debug, Default)]
@ -158,16 +161,16 @@ pub struct CardEntry {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct SanityCheckOut { pub struct SanityCheckOut {
status: SanityCheckStatus, pub status: SanityCheckStatus,
#[serde(rename = "c", default, deserialize_with = "default_on_invalid")] #[serde(rename = "c", default, deserialize_with = "default_on_invalid")]
client: Option<SanityCheckCounts>, pub client: Option<SanityCheckCounts>,
#[serde(rename = "s", default, deserialize_with = "default_on_invalid")] #[serde(rename = "s", default, deserialize_with = "default_on_invalid")]
server: Option<SanityCheckCounts>, pub server: Option<SanityCheckCounts>,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
enum SanityCheckStatus { pub enum SanityCheckStatus {
Ok, Ok,
Bad, Bad,
} }
@ -666,9 +669,8 @@ impl Collection {
pub(crate) async fn full_download_inner(self, server: Box<dyn SyncServer>) -> Result<()> { pub(crate) async fn full_download_inner(self, server: Box<dyn SyncServer>) -> Result<()> {
let col_path = self.col_path.clone(); let col_path = self.col_path.clone();
let folder = col_path.parent().unwrap();
self.close(false)?; self.close(false)?;
let out_file = server.full_download(folder).await?; let out_file = server.full_download().await?;
// check file ok // check file ok
let db = open_and_check_sqlite_file(out_file.path())?; let db = open_and_check_sqlite_file(out_file.path())?;
db.execute_batch("update col set ls=mod")?; db.execute_batch("update col set ls=mod")?;

View File

@ -28,7 +28,7 @@ pub trait SyncServer {
/// If `can_consume` is true, the local server will move or remove the file, instead /// If `can_consume` is true, the local server will move or remove the file, instead
/// creating a copy. The remote server ignores this argument. /// creating a copy. The remote server ignores this argument.
async fn full_upload(self: Box<Self>, col_path: &Path, can_consume: bool) -> Result<()>; async fn full_upload(self: Box<Self>, col_path: &Path, can_consume: bool) -> Result<()>;
async fn full_download(self: Box<Self>, folder: &Path) -> Result<NamedTempFile>; async fn full_download(self: Box<Self>) -> Result<NamedTempFile>;
} }
pub struct LocalServer { pub struct LocalServer {
@ -58,6 +58,12 @@ impl LocalServer {
server_chunk_ids: None, server_chunk_ids: None,
} }
} }
/// Consumes self and returns the stored collection. If a sync has begun, caller must ensure they
/// call .finish() or .abort() before calling this.
pub fn into_col(self) -> Collection {
self.col
}
} }
#[async_trait(?Send)] #[async_trait(?Send)]
@ -168,14 +174,14 @@ impl SyncServer for LocalServer {
fs::rename(col_path, &target_col_path).map_err(Into::into) fs::rename(col_path, &target_col_path).map_err(Into::into)
} }
async fn full_download(mut self: Box<Self>, output_folder: &Path) -> Result<NamedTempFile> { async fn full_download(mut self: Box<Self>) -> Result<NamedTempFile> {
// bump usn/mod & close // bump usn/mod & close
self.col.transact(None, |col| col.storage.increment_usn())?; self.col.transact(None, |col| col.storage.increment_usn())?;
let col_path = self.col.col_path.clone(); let col_path = self.col.col_path.clone();
self.col.close(true)?; self.col.close(true)?;
// copy file and return path // copy file and return path
let temp_file = NamedTempFile::new_in(output_folder)?; let temp_file = NamedTempFile::new()?;
fs::copy(&col_path, temp_file.path())?; fs::copy(&col_path, temp_file.path())?;
Ok(temp_file) Ok(temp_file)