anki/qt/aqt/sync.py
2021-02-06 19:01:48 +10:00

336 lines
8.9 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import enum
import os
from concurrent.futures import Future
from typing import Callable, Tuple
import aqt
from anki.collection import FullSyncProgress, NormalSyncProgress, ProgressKind
from anki.errors import Interrupted, SyncError
from anki.lang import TR, without_unicode_isolation
from anki.sync import SyncOutput, SyncStatus
from anki.utils import platDesc
from aqt.qt import (
QDialog,
QDialogButtonBox,
QGridLayout,
QLabel,
QLineEdit,
Qt,
QTimer,
QVBoxLayout,
qconnect,
)
from aqt.utils import (
askUser,
askUserDialog,
disable_help_button,
showText,
showWarning,
tr,
)
class FullSyncChoice(enum.Enum):
CANCEL = 0
UPLOAD = 1
DOWNLOAD = 2
def get_sync_status(
mw: aqt.main.AnkiQt, callback: Callable[[SyncStatus], None]
) -> None:
auth = mw.pm.sync_auth()
if not auth:
callback(SyncStatus(required=SyncStatus.NO_CHANGES)) # pylint:disable=no-member
return
def on_future_done(fut: Future) -> None:
try:
out = fut.result()
except Exception as e:
# swallow errors
print("sync status check failed:", str(e))
return
callback(out)
mw.taskman.run_in_background(lambda: mw.col.sync_status(auth), on_future_done)
def handle_sync_error(mw: aqt.main.AnkiQt, err: Exception) -> None:
if isinstance(err, SyncError):
if err.is_auth_error():
mw.pm.clear_sync_auth()
elif isinstance(err, Interrupted):
# no message to show
return
showWarning(str(err))
def on_normal_sync_timer(mw: aqt.main.AnkiQt) -> None:
progress = mw.col.latest_progress()
if progress.kind != ProgressKind.NormalSync:
return
assert isinstance(progress.val, NormalSyncProgress)
mw.progress.update(
label=f"{progress.val.added}\n{progress.val.removed}",
process=False,
)
mw.progress.set_title(progress.val.stage)
if mw.progress.want_cancel():
mw.col.abort_sync()
def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None:
auth = mw.pm.sync_auth()
assert auth
def on_timer() -> None:
on_normal_sync_timer(mw)
timer = QTimer(mw)
qconnect(timer.timeout, on_timer)
timer.start(150)
def on_future_done(fut: Future) -> None:
mw.col.db.begin()
timer.stop()
try:
out: SyncOutput = fut.result()
except Exception as err:
handle_sync_error(mw, err)
return on_done()
mw.pm.set_host_number(out.host_number)
if out.server_message:
showText(out.server_message)
if out.required == out.NO_CHANGES:
# all done
return on_done()
else:
full_sync(mw, out, on_done)
mw.col.save(trx=False)
mw.taskman.with_progress(
lambda: mw.col.sync_collection(auth),
on_future_done,
label=tr(TR.SYNC_CHECKING),
immediate=True,
)
def full_sync(
mw: aqt.main.AnkiQt, out: SyncOutput, on_done: Callable[[], None]
) -> None:
if out.required == out.FULL_DOWNLOAD:
confirm_full_download(mw, on_done)
elif out.required == out.FULL_UPLOAD:
full_upload(mw, on_done)
else:
choice = ask_user_to_decide_direction()
if choice == FullSyncChoice.UPLOAD:
full_upload(mw, on_done)
elif choice == FullSyncChoice.DOWNLOAD:
full_download(mw, on_done)
else:
on_done()
def confirm_full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None:
# confirmation step required, as some users customize their notetypes
# in an empty collection, then want to upload them
if not askUser(tr(TR.SYNC_CONFIRM_EMPTY_DOWNLOAD)):
return on_done()
else:
mw.closeAllWindows(lambda: full_download(mw, on_done))
def on_full_sync_timer(mw: aqt.main.AnkiQt) -> None:
progress = mw.col.latest_progress()
if progress.kind != ProgressKind.FullSync:
return
assert isinstance(progress.val, FullSyncProgress)
if progress.val.transferred == progress.val.total:
label = tr(TR.SYNC_CHECKING)
else:
label = None
mw.progress.update(
value=progress.val.transferred,
max=progress.val.total,
process=False,
label=label,
)
if mw.progress.want_cancel():
mw.col.abort_sync()
def full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None:
def on_timer() -> None:
on_full_sync_timer(mw)
timer = QTimer(mw)
qconnect(timer.timeout, on_timer)
timer.start(150)
def download() -> None:
mw._close_for_full_download()
mw.col.full_download(mw.pm.sync_auth())
def on_future_done(fut: Future) -> None:
timer.stop()
mw.col.reopen(after_full_sync=True)
mw.reset()
try:
fut.result()
except Exception as err:
handle_sync_error(mw, err)
mw.media_syncer.start()
return on_done()
mw.taskman.with_progress(
download,
on_future_done,
label=tr(TR.SYNC_DOWNLOADING_FROM_ANKIWEB),
)
def full_upload(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None:
mw.col.close_for_full_sync()
def on_timer() -> None:
on_full_sync_timer(mw)
timer = QTimer(mw)
qconnect(timer.timeout, on_timer)
timer.start(150)
def on_future_done(fut: Future) -> None:
timer.stop()
mw.col.reopen(after_full_sync=True)
mw.reset()
try:
fut.result()
except Exception as err:
handle_sync_error(mw, err)
return on_done()
mw.media_syncer.start()
return on_done()
mw.taskman.with_progress(
lambda: mw.col.full_upload(mw.pm.sync_auth()),
on_future_done,
label=tr(TR.SYNC_UPLOADING_TO_ANKIWEB),
)
def sync_login(
mw: aqt.main.AnkiQt,
on_success: Callable[[], None],
username: str = "",
password: str = "",
) -> None:
while True:
(username, password) = get_id_and_pass_from_user(mw, username, password)
if not username and not password:
return
if username and password:
break
def on_future_done(fut: Future) -> None:
try:
auth = fut.result()
except SyncError as e:
if e.is_auth_error():
showWarning(str(e))
sync_login(mw, on_success, username, password)
else:
handle_sync_error(mw, e)
return
except Exception as err:
handle_sync_error(mw, err)
return
mw.pm.set_host_number(auth.host_number)
mw.pm.set_sync_key(auth.hkey)
mw.pm.set_sync_username(username)
on_success()
mw.taskman.with_progress(
lambda: mw.col.sync_login(username=username, password=password),
on_future_done,
)
def ask_user_to_decide_direction() -> FullSyncChoice:
button_labels = [
tr(TR.SYNC_UPLOAD_TO_ANKIWEB),
tr(TR.SYNC_DOWNLOAD_FROM_ANKIWEB),
tr(TR.SYNC_CANCEL_BUTTON),
]
diag = askUserDialog(tr(TR.SYNC_CONFLICT_EXPLANATION), button_labels)
diag.setDefault(2)
ret = diag.run()
if ret == button_labels[0]:
return FullSyncChoice.UPLOAD
elif ret == button_labels[1]:
return FullSyncChoice.DOWNLOAD
else:
return FullSyncChoice.CANCEL
def get_id_and_pass_from_user(
mw: aqt.main.AnkiQt, username: str = "", password: str = ""
) -> Tuple[str, str]:
diag = QDialog(mw)
diag.setWindowTitle("Anki")
disable_help_button(diag)
diag.setWindowModality(Qt.WindowModal)
vbox = QVBoxLayout()
info_label = QLabel(
without_unicode_isolation(
tr(TR.SYNC_ACCOUNT_REQUIRED, link="https://ankiweb.net/account/register")
)
)
info_label.setOpenExternalLinks(True)
info_label.setWordWrap(True)
vbox.addWidget(info_label)
vbox.addSpacing(20)
g = QGridLayout()
l1 = QLabel(tr(TR.SYNC_ANKIWEB_ID_LABEL))
g.addWidget(l1, 0, 0)
user = QLineEdit()
user.setText(username)
g.addWidget(user, 0, 1)
l2 = QLabel(tr(TR.SYNC_PASSWORD_LABEL))
g.addWidget(l2, 1, 0)
passwd = QLineEdit()
passwd.setText(password)
passwd.setEchoMode(QLineEdit.Password)
g.addWidget(passwd, 1, 1)
vbox.addLayout(g)
bb = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) # type: ignore
bb.button(QDialogButtonBox.Ok).setAutoDefault(True)
qconnect(bb.accepted, diag.accept)
qconnect(bb.rejected, diag.reject)
vbox.addWidget(bb)
diag.setLayout(vbox)
diag.show()
accepted = diag.exec_()
if not accepted:
return ("", "")
return (user.text().strip(), passwd.text())
# export platform version to syncing code
os.environ["PLATFORM"] = platDesc()