new media syncing protocol

- media syncing no longer locks the account, so it can be done
  in the background in the future, and multiple clients can safely
  sync media at the same time
- all operations are now idempotent, so they can be repeatedly safely
  in the event of a connection error
- whether it's a normal incremental sync, an initial sync,
  or the media database has been deleted, no files will be uploaded
  or downloaded if they already exist on the other side
- file removals are now chunked like additions & updates, preventing
  timeouts due to large requests
- if the server can't process a chunk in time, it will return a count
  of what it did process, so the client can retry the rest

Notes for AnkiDroid:

- when porting this, recommend you pick a different name for the
.media.db2 file, so users don't accidentally copy the AD version to
the desktop or vice versa
- please make sure filenames are added to the zip in NFC form
This commit is contained in:
dae 2014-06-26 07:35:47 +09:00 committed by Damien Elmes
parent 1933779fa6
commit 2aa7714f87
5 changed files with 320 additions and 269 deletions

View File

@ -47,8 +47,8 @@ MODEL_CLOZE = 1
# deck schema & syncing vars
SCHEMA_VERSION = 11
SYNC_ZIP_SIZE = int(2.5*1024*1024)
SYNC_ZIP_COUNT = 100
SYNC_URL = os.environ.get("SYNC_URL") or "https://ankiweb.net/sync/"
SYNC_ZIP_COUNT = 25
SYNC_BASE = os.environ.get("SYNC_BASE") or "https://ankiweb.net/"
SYNC_VER = 8
HELP_SITE="http://ankisrs.net/docs/manual.html"

View File

@ -258,7 +258,9 @@ class AnkiPackageExporter(AnkiExporter):
media[c] = file
# tidy up intermediate files
os.unlink(colfile)
os.unlink(path.replace(".apkg", ".media.db"))
p = path.replace(".apkg", ".media.db2")
if os.path.exists(p):
os.unlink(p)
shutil.rmtree(path.replace(".apkg", ".media"))
return media

View File

@ -9,13 +9,11 @@ import sys
import zipfile
from cStringIO import StringIO
import send2trash
from anki.utils import checksum, isWin, isMac, json
from anki.db import DB
from anki.consts import *
from anki.latex import mungeQA
class MediaManager(object):
soundRegexps = ["(?i)(\[sound:(?P<fname>[^]]+)\])"]
@ -54,12 +52,47 @@ class MediaManager(object):
def connect(self):
if self.col.server:
return
path = self.dir()+".db"
path = self.dir()+".db2"
create = not os.path.exists(path)
os.chdir(self._dir)
self.db = DB(path)
if create:
self._initDB()
self.maybeUpgrade()
def _initDB(self):
self.db.executescript("""
create table media (
fname text not null primary key,
csum text, -- null indicates deleted file
mtime int not null, -- zero if deleted
dirty int not null
);
create index idx_media_dirty on media (dirty);
create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
""")
def maybeUpgrade(self):
oldpath = self.dir()+".db"
if os.path.exists(oldpath):
self.db.execute('attach "../collection.media.db" as old')
self.db.execute("""
insert into media
select m.fname, csum, mod, ifnull((select 1 from log l2 where l2.fname=m.fname), 0) as dirty
from old.media m
left outer join old.log l using (fname)
union
select fname, null, 0, 1 from old.log where type=1;""")
self.db.execute("delete from meta")
self.db.execute("""
insert into meta select dirMod, usn from old.meta
""")
self.db.execute("detach old")
self.db.commit()
self.db.execute("vacuum analyze")
os.rename("../collection.media.db", "../collection.media.db.old")
def close(self):
if self.col.server:
@ -268,75 +301,6 @@ class MediaManager(object):
def have(self, fname):
return os.path.exists(os.path.join(self.dir(), fname))
# Media syncing - changes and removal
##########################################################################
def hasChanged(self):
return self.db.scalar("select 1 from log limit 1")
def removed(self):
return self.db.list("select * from log where type = ?", MEDIA_REM)
def syncRemove(self, fnames):
# remove provided deletions
for f in fnames:
if os.path.exists(f):
send2trash.send2trash(f)
self.db.execute("delete from log where fname = ?", f)
self.db.execute("delete from media where fname = ?", f)
# and all locally-logged deletions, as server has acked them
self.db.execute("delete from log where type = ?", MEDIA_REM)
self.db.commit()
# Media syncing - unbundling zip files from server
##########################################################################
def syncAdd(self, zipData):
"Extract zip data; true if finished."
f = StringIO(zipData)
z = zipfile.ZipFile(f, "r")
finished = False
meta = None
media = []
# get meta info first
meta = json.loads(z.read("_meta"))
nextUsn = int(z.read("_usn"))
# then loop through all files
for i in z.infolist():
if i.filename == "_meta" or i.filename == "_usn":
# ignore previously-retrieved meta
continue
elif i.filename == "_finished":
# last zip in set
finished = True
else:
data = z.read(i)
csum = checksum(data)
name = meta[i.filename]
if not isinstance(name, unicode):
name = unicode(name, "utf8")
# normalize name for platform
if isMac:
name = unicodedata.normalize("NFD", name)
else:
name = unicodedata.normalize("NFC", name)
# save file
open(name, "wb").write(data)
# update db
media.append((name, csum, self._mtime(name)))
# remove entries from local log
self.db.execute("delete from log where fname = ?", name)
# update media db and note new starting usn
if media:
self.db.executemany(
"insert or replace into media values (?,?,?)", media)
self.setUsn(nextUsn) # commits
# if we have finished adding, we need to record the new folder mtime
# so that we don't trigger a needless scan
if finished:
self.syncMod()
return finished
# Illegal characters
##########################################################################
@ -351,57 +315,16 @@ class MediaManager(object):
return True
return not not re.search(self._illegalCharReg, str)
# Media syncing - bundling zip files to send to server
##########################################################################
# Because there's no standard filename encoding for zips, and because not
# all zip clients support retrieving mtime, we store the files as ascii
# and place a json file in the zip with the necessary information.
def zipAdded(self):
"Add files to a zip until over SYNC_ZIP_SIZE/COUNT. Return zip data."
f = StringIO()
z = zipfile.ZipFile(f, "w", compression=zipfile.ZIP_DEFLATED)
sz = 0
cnt = 0
files = {}
cur = self.db.execute(
"select fname from log where type = ?", MEDIA_ADD)
fnames = []
while 1:
fname = cur.fetchone()
if not fname:
# add a flag so the server knows it can clean up
z.writestr("_finished", "")
break
fname = fname[0]
# we add it as a one-element array simply to make
# the later forgetAdded() call easier
fnames.append([fname])
z.write(fname, str(cnt))
files[str(cnt)] = unicodedata.normalize("NFC", fname)
sz += os.path.getsize(fname)
if sz >= SYNC_ZIP_SIZE or cnt >= SYNC_ZIP_COUNT:
break
cnt += 1
z.writestr("_meta", json.dumps(files))
z.close()
return f.getvalue(), fnames
def forgetAdded(self, fnames):
if not fnames:
return
self.db.executemany("delete from log where fname = ?", fnames)
self.db.commit()
# Tracking changes (private)
# Tracking changes
##########################################################################
def _initDB(self):
self.db.executescript("""
create table media (fname text primary key, csum text, mod int);
create table meta (dirMod int, usn int); insert into meta values (0, 0);
create table log (fname text primary key, type int);
""")
def findChanges(self):
"Scan the media folder if it's changed, and note any changes."
if self._changed():
self._logChanges()
def haveDirty(self):
return self.db.scalar("select 1 from media where dirty=1 limit 1")
def _mtime(self, path):
return int(os.stat(path).st_mtime)
@ -409,17 +332,6 @@ create table log (fname text primary key, type int);
def _checksum(self, path):
return checksum(open(path, "rb").read())
def usn(self):
return self.db.scalar("select usn from meta")
def setUsn(self, usn):
self.db.execute("update meta set usn = ?", usn)
self.db.commit()
def syncMod(self):
self.db.execute("update meta set dirMod = ?", self._mtime(self.dir()))
self.db.commit()
def _changed(self):
"Return dir mtime if it has changed since the last findChanges()"
# doesn't track edits, but user can add or remove a file to update
@ -429,38 +341,24 @@ create table log (fname text primary key, type int);
return False
return mtime
def findChanges(self):
"Scan the media folder if it's changed, and note any changes."
if self._changed():
self._logChanges()
def _logChanges(self):
(added, removed) = self._changes()
log = []
media = []
mediaRem = []
for f in added:
mt = self._mtime(f)
media.append((f, self._checksum(f), mt))
log.append((f, MEDIA_ADD))
media.append((f, self._checksum(f), mt, 1))
for f in removed:
mediaRem.append((f,))
log.append((f, MEDIA_REM))
media.append((f, None, 0, 1))
# update media db
self.db.executemany("insert or replace into media values (?,?,?)",
self.db.executemany("insert or replace into media values (?,?,?,?)",
media)
if mediaRem:
self.db.executemany("delete from media where fname = ?",
mediaRem)
self.db.execute("update meta set dirMod = ?", self._mtime(self.dir()))
# and logs
self.db.executemany("insert or replace into log values (?,?)", log)
self.db.commit()
def _changes(self):
self.cache = {}
for (name, csum, mod) in self.db.execute(
"select * from media"):
"select fname, csum, mtime from media"):
self.cache[name] = [csum, mod, False]
added = []
removed = []
@ -495,34 +393,104 @@ create table log (fname text primary key, type int);
removed.append(k)
return added, removed
def sanityCheck(self):
assert not self.db.scalar("select count() from log")
cnt = self.db.scalar("select count() from media")
return cnt
# Syncing-related
##########################################################################
def lastUsn(self):
return self.db.scalar("select lastUsn from meta")
def setLastUsn(self, usn):
self.db.execute("update meta set lastUsn = ?", usn)
self.db.commit()
def syncInfo(self, fname):
ret = self.db.first(
"select csum, dirty from media where fname=?", fname)
return ret or (None, 0)
def markClean(self, fnames):
for fname in fnames:
self.db.execute(
"update media set dirty=0 where fname=?", fname)
def syncDelete(self, fname):
if os.path.exists(fname):
os.unlink(fname)
self.db.execute("delete from media where fname=?", fname)
def mediaCount(self):
return self.db.scalar(
"select count() from media where csum is not null")
def forceResync(self):
self.db.execute("delete from media")
self.db.execute("delete from log")
self.db.execute("update meta set usn = 0, dirMod = 0")
self.db.execute("vacuum analyze")
self.db.commit()
def removeExisting(self, files):
"Remove files from list of files to sync, and return missing files."
need = []
remove = []
for f in files:
# Media syncing: zips
##########################################################################
def mediaChangesZip(self):
f = StringIO()
z = zipfile.ZipFile(f, "w", compression=zipfile.ZIP_DEFLATED)
fnames = []
# meta is list of (fname, zipname), where zipname of None
# is a deleted file
meta = []
sz = 0
for c, (fname, csum) in enumerate(self.db.execute(
"select fname, csum from media where dirty=1"
" limit %d"%SYNC_ZIP_COUNT)):
fnames.append(fname)
normname = unicodedata.normalize("NFC", fname)
if csum:
z.write(fname, str(c))
meta.append((normname, str(c)))
sz += os.path.getsize(fname)
else:
meta.append((normname, ""))
if sz >= SYNC_ZIP_SIZE:
break
z.writestr("_meta", json.dumps(meta))
z.close()
return f.getvalue(), fnames
def addFilesFromZip(self, zipData):
"Extract zip data; true if finished."
f = StringIO(zipData)
z = zipfile.ZipFile(f, "r")
media = []
# get meta info first
meta = json.loads(z.read("_meta"))
# then loop through all files
cnt = 0
for i in z.infolist():
if i.filename == "_meta":
# ignore previously-retrieved meta
continue
else:
data = z.read(i)
csum = checksum(data)
name = meta[i.filename]
if not isinstance(name, unicode):
name = unicode(name, "utf8")
# normalize name for platform
if isMac:
name = unicodedata.normalize("NFD", f)
name = unicodedata.normalize("NFD", name)
else:
name = f
if self.db.scalar("select 1 from log where fname=?", name):
remove.append((name,))
else:
need.append(f)
self.db.executemany("delete from log where fname=?", remove)
self.db.commit()
# if we need all the server files, it's faster to pass None than
# the full list
if need and len(files) == len(need):
return None
return need
name = unicodedata.normalize("NFC", name)
# save file
open(name, "wb").write(data)
# update db
media.append((name, csum, self._mtime(name), 0))
cnt += 1
if media:
self.db.executemany(
"insert or replace into media values (?,?,?,?)", media)
return cnt

View File

@ -15,7 +15,6 @@ from anki.consts import *
from hooks import runHook
import anki
# syncing vars
HTTP_TIMEOUT = 90
HTTP_PROXY = None
@ -539,6 +538,7 @@ class HttpSyncer(object):
self.hkey = hkey
self.skey = checksum(str(random.random()))[:8]
self.con = con or httpCon()
self.postVars = {}
def assertOk(self, resp):
if resp['status'] != '200':
@ -550,18 +550,13 @@ class HttpSyncer(object):
# costly. We could send it as a raw post, but more HTTP clients seem to
# support file uploading, so this is the more compatible choice.
def req(self, method, fobj=None, comp=6,
badAuthRaises=True, hkey=True):
def req(self, method, fobj=None, comp=6, badAuthRaises=False):
BOUNDARY="Anki-sync-boundary"
bdry = "--"+BOUNDARY
buf = StringIO()
# compression flag and session key as post vars
vars = {}
vars['c'] = 1 if comp else 0
if hkey:
vars['k'] = self.hkey
vars['s'] = self.skey
for (key, value) in vars.items():
# post vars
self.postVars['c'] = 1 if comp else 0
for (key, value) in self.postVars.items():
buf.write(bdry + "\r\n")
buf.write(
'Content-Disposition: form-data; name="%s"\r\n\r\n%s\r\n' %
@ -595,7 +590,7 @@ Content-Type: application/octet-stream\r\n\r\n""")
body = buf.getvalue()
buf.close()
resp, cont = self.con.request(
SYNC_URL+method, "POST", headers=headers, body=body)
self.syncURL()+method, "POST", headers=headers, body=body)
if not badAuthRaises:
# return false if bad auth instead of raising
if resp['status'] == '403':
@ -611,11 +606,15 @@ class RemoteServer(HttpSyncer):
def __init__(self, hkey):
HttpSyncer.__init__(self, hkey)
def syncURL(self):
return SYNC_BASE + "sync/"
def hostKey(self, user, pw):
"Returns hkey or none if user/pw incorrect."
self.postVars = dict()
ret = self.req(
"hostKey", StringIO(json.dumps(dict(u=user, p=pw))),
badAuthRaises=False, hkey=False)
badAuthRaises=False)
if not ret:
# invalid auth
return
@ -623,6 +622,10 @@ class RemoteServer(HttpSyncer):
return self.hkey
def meta(self):
self.postVars = dict(
k=self.hkey,
s=self.skey,
)
ret = self.req(
"meta", StringIO(json.dumps(dict(
v=SYNC_VER, cv="ankidesktop,%s,%s"%(anki.version, platDesc())))),
@ -661,8 +664,15 @@ class FullSyncer(HttpSyncer):
def __init__(self, col, hkey, con):
HttpSyncer.__init__(self, hkey, con)
self.postVars = dict(
k=self.hkey,
v="ankidesktop,%s,%s"%(anki.version, platDesc()),
)
self.col = col
def syncURL(self):
return SYNC_BASE + "sync/"
def download(self):
runHook("sync", "download")
self.col.close()
@ -697,117 +707,188 @@ class FullSyncer(HttpSyncer):
# Media syncing
##########################################################################
#
# About conflicts:
# - to minimize data loss, if both sides are marked for sending and one
# side has been deleted, favour the add
# - if added/changed on both sides, favour the server version on the
# assumption other syncers are in sync with the server
#
class MediaSyncer(object):
def __init__(self, col, server=None):
self.col = col
self.server = server
self.added = None
def sync(self, mediaUsn):
# step 1: check if there have been any changes
def sync(self):
# check if there have been any changes
runHook("sync", "findMedia")
lusn = self.col.media.usn()
# if first sync or resync, clear list of files we think we've sent
if not lusn:
self.col.media.forceResync()
self.col.log("findChanges")
self.col.media.findChanges()
if lusn == mediaUsn and not self.col.media.hasChanged():
# begin session and check if in sync
lastUsn = self.col.media.lastUsn()
ret = self.server.begin()
srvUsn = ret['usn']
if lastUsn == srvUsn and not self.col.media.haveDirty():
return "noChanges"
# step 1.5: if resyncing, we need to get the list of files the server
# has and remove them from our local list of files to sync
if not lusn:
files = self.server.mediaList()
need = self.col.media.removeExisting(files)
# loop through and process changes from server
self.col.log("last local usn is %s"%lastUsn)
while True:
data = self.server.mediaChanges(lastUsn=lastUsn)
self.col.log("mediaChanges resp count %d"%len(data))
if not data:
break
need = []
lastUsn = data[-1][1]
for fname, rusn, rsum in data:
lsum, ldirty = self.col.media.syncInfo(fname)
self.col.log(
"check: lsum=%s rsum=%s ldirty=%d rusn=%d fname=%s"%(
(lsum and lsum[0:4]),
(rsum and rsum[0:4]),
ldirty,
rusn,
fname))
if rsum:
# added/changed remotely
if not lsum or lsum != rsum:
self.col.log("will fetch")
need.append(fname)
else:
need = None
# step 2: send/recv deletions
runHook("sync", "removeMedia")
lrem = self.removed()
rrem = self.server.remove(fnames=lrem, minUsn=lusn)
self.remove(rrem)
# step 3: stream files from server
runHook("sync", "server")
while 1:
runHook("sync", "streamMedia")
usn = self.col.media.usn()
zip = self.server.files(minUsn=usn, need=need)
if self.addFiles(zip=zip):
break
# step 4: stream files to the server
runHook("sync", "client")
while 1:
runHook("sync", "streamMedia")
zip, fnames = self.files()
self.col.log("have same already")
ldirty and self.col.media.markClean([fname])
elif lsum:
# deleted remotely
if not ldirty:
self.col.log("delete local")
self.col.media.syncDelete(fname)
else:
# conflict; local add overrides remote delete
self.col.log("conflict; will send")
else:
# deleted both sides
self.col.log("both sides deleted")
ldirty and self.col.media.markClean([fname])
self._downloadFiles(need)
self.col.log("update last usn to %d"%lastUsn)
self.col.media.setLastUsn(lastUsn) # commits
# at this point we're all up to date with the server's changes,
# and we need to send our own
updateConflict = False
while True:
zip, fnames = self.col.media.mediaChangesZip()
if not fnames:
# finished
break
usn = self.server.addFiles(zip=zip)
# after server has replied, safe to remove from log
self.col.media.forgetAdded(fnames)
self.col.media.setUsn(usn)
# step 5: sanity check during beta testing
# NOTE: when removing this, need to move server tidyup
# back from sanity check to addFiles
c = self.mediaSanity()
s = self.server.mediaSanity(client=c)
self.col.log("mediaSanity", c, s)
if c != s:
# if the sanity check failed, force a resync
processedCnt, serverLastUsn = self.server.uploadChanges(zip)
self.col.media.markClean(fnames[0:processedCnt])
self.col.log("processed %d, serverUsn %d, clientUsn %d" % (
processedCnt, serverLastUsn, lastUsn
))
if serverLastUsn - processedCnt == lastUsn:
self.col.log("lastUsn in sync, updating local")
self.col.media.setLastUsn(serverLastUsn)
else:
self.col.log("concurrent update, skipping usn update")
updateConflict = True
if updateConflict:
self.col.log("restart sync due to concurrent update")
return self.sync()
lcnt = self.col.media.mediaCount()
ret = self.server.mediaSanity(local=lcnt)
if ret == "OK":
return "OK"
else:
self.col.media.forceResync()
return "sanityCheckFailed"
return "success"
return ret
def removed(self):
return self.col.media.removed()
def remove(self, fnames, minUsn=None):
self.col.media.syncRemove(fnames)
if minUsn is not None:
# we're the server
return self.col.media.removed()
def _downloadFiles(self, fnames):
self.col.log("%d files to fetch"%len(fnames))
while fnames:
top = fnames[0:SYNC_ZIP_COUNT]
self.col.log("fetch %s"%top)
zipData = self.server.downloadFiles(files=top)
cnt = self.col.media.addFilesFromZip(zipData)
self.col.log("received %d files"%cnt)
fnames = fnames[cnt:]
def files(self):
return self.col.media.zipAdded()
return self.col.media.addFilesToZip()
def addFiles(self, zip):
"True if zip is the last in set. Server returns new usn instead."
return self.col.media.syncAdd(zip)
def mediaSanity(self):
return self.col.media.sanityCheck()
return self.col.media.addFilesFromZip(zip)
# Remote media syncing
##########################################################################
class RemoteMediaServer(HttpSyncer):
def __init__(self, hkey, con):
def __init__(self, col, hkey, con):
self.col = col
HttpSyncer.__init__(self, hkey, con)
def remove(self, **kw):
return json.loads(
self.req("remove", StringIO(json.dumps(kw))))
def syncURL(self):
return SYNC_BASE + "msync/"
def files(self, **kw):
return self.req("files", StringIO(json.dumps(kw)))
def begin(self):
self.postVars = dict(
k=self.hkey,
v="ankidesktop,%s,%s"%(anki.version, platDesc())
)
ret = self._dataOnly(json.loads(self.req(
"begin", StringIO(json.dumps(dict())))))
self.skey = ret['sk']
return ret
def addFiles(self, zip):
# args: lastUsn
def mediaChanges(self, **kw):
self.postVars = dict(
sk=self.skey,
)
resp = json.loads(
self.req("mediaChanges", StringIO(json.dumps(kw))))
return self._dataOnly(resp)
# args: files
def downloadFiles(self, **kw):
return self.req("downloadFiles", StringIO(json.dumps(kw)))
def uploadChanges(self, zip):
# no compression, as we compress the zip file instead
return json.loads(
self.req("addFiles", StringIO(zip), comp=0))
return self._dataOnly(json.loads(
self.req("uploadChanges", StringIO(zip), comp=0)))
# args: local
def mediaSanity(self, **kw):
return json.loads(
self.req("mediaSanity", StringIO(json.dumps(kw))))
return self._dataOnly(json.loads(
self.req("mediaSanity", StringIO(json.dumps(kw)))))
def mediaList(self):
return json.loads(
self.req("mediaList"))
def _dataOnly(self, resp):
if resp['err']:
self.col.log("error returned:%s"%resp['err'])
raise Exception("SyncError:%s"%resp['err'])
return resp['data']
# only for unit tests
def mediatest(self, n):
return json.loads(
self.req("mediatest", StringIO(
json.dumps(dict(n=n)))))
def mediatest(self, cmd):
self.postVars = dict(
k=self.hkey,
)
return self._dataOnly(json.loads(
self.req("newMediaTest", StringIO(
json.dumps(dict(cmd=cmd))))))

View File

@ -404,9 +404,9 @@ class SyncThread(QThread):
def _syncMedia(self):
if not self.media:
return
self.server = RemoteMediaServer(self.hkey, self.server.con)
self.server = RemoteMediaServer(self.col, self.hkey, self.server.con)
self.client = MediaSyncer(self.col, self.server)
ret = self.client.sync(self.mediaUsn)
ret = self.client.sync()
if ret == "noChanges":
self.fireEvent("noMediaChanges")
elif ret == "sanityCheckFailed":