# Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html from __future__ import annotations import os from concurrent.futures import Future from typing import Callable import aqt import aqt.main from anki.errors import Interrupted, SyncError, SyncErrorKind from anki.lang import without_unicode_isolation from anki.sync import SyncOutput, SyncStatus from anki.sync_pb2 import SyncAuth from anki.utils import plat_desc from aqt import gui_hooks from aqt.qt import ( QDialog, QDialogButtonBox, QGridLayout, QLabel, QLineEdit, Qt, QTimer, QVBoxLayout, qconnect, ) from aqt.utils import ( ask_user_dialog, askUser, disable_help_button, showText, showWarning, tooltip, tr, ) def get_sync_status( mw: aqt.main.AnkiQt, callback: Callable[[SyncStatus], None] ) -> None: auth = mw.pm.sync_auth() if not auth: callback(SyncStatus(required=SyncStatus.NO_CHANGES)) # pylint:disable=no-member return def on_future_done(fut: Future[SyncStatus]) -> None: try: out = fut.result() except Exception as e: # swallow errors print("sync status check failed:", str(e)) return if out.new_endpoint: mw.pm.set_current_sync_url(out.new_endpoint) callback(out) mw.taskman.run_in_background(lambda: mw.col.sync_status(auth), on_future_done) def handle_sync_error(mw: aqt.main.AnkiQt, err: Exception) -> None: if isinstance(err, SyncError): if err.kind is SyncErrorKind.AUTH: mw.pm.clear_sync_auth() elif isinstance(err, Interrupted): # no message to show return showWarning(str(err)) def on_normal_sync_timer(mw: aqt.main.AnkiQt) -> None: progress = mw.col.latest_progress() if not progress.HasField("normal_sync"): return sync_progress = progress.normal_sync mw.progress.update( label=f"{sync_progress.added}\n{sync_progress.removed}", process=False, ) mw.progress.set_title(sync_progress.stage) if mw.progress.want_cancel(): mw.col.abort_sync() def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: auth = mw.pm.sync_auth() if not auth: raise Exception("expected auth") def on_timer() -> None: on_normal_sync_timer(mw) timer = QTimer(mw) qconnect(timer.timeout, on_timer) timer.start(150) def on_future_done(fut: Future[SyncOutput]) -> None: mw.col.db.begin() # scheduler version may have changed mw.col._load_scheduler() timer.stop() try: out = fut.result() except Exception as err: handle_sync_error(mw, err) return on_done() mw.pm.set_host_number(out.host_number) if out.new_endpoint: mw.pm.set_current_sync_url(out.new_endpoint) if out.server_message: showText(out.server_message) if out.required == out.NO_CHANGES: tooltip(parent=mw, msg=tr.sync_collection_complete()) # all done; track media progress mw.media_syncer.start_monitoring() return on_done() else: full_sync(mw, out, on_done) mw.col.save(trx=False) mw.taskman.with_progress( lambda: mw.col.sync_collection(auth, mw.pm.media_syncing_enabled()), on_future_done, label=tr.sync_checking(), immediate=True, ) def full_sync( mw: aqt.main.AnkiQt, out: SyncOutput, on_done: Callable[[], None] ) -> None: server_usn = out.server_media_usn if mw.pm.media_syncing_enabled() else None if out.required == out.FULL_DOWNLOAD: confirm_full_download(mw, server_usn, on_done) elif out.required == out.FULL_UPLOAD: full_upload(mw, server_usn, on_done) else: button_labels: list[str] = [ tr.sync_upload_to_ankiweb(), tr.sync_download_from_ankiweb(), tr.sync_cancel_button(), ] def callback(choice: int) -> None: if choice == 0: full_upload(mw, server_usn, on_done) elif choice == 1: full_download(mw, server_usn, on_done) else: on_done() ask_user_dialog( tr.sync_conflict_explanation(), callback=callback, buttons=button_labels, default_button=2, ) def confirm_full_download( mw: aqt.main.AnkiQt, server_usn: int, on_done: Callable[[], None] ) -> None: # confirmation step required, as some users customize their notetypes # in an empty collection, then want to upload them if not askUser(tr.sync_confirm_empty_download()): return on_done() else: mw.closeAllWindows(lambda: full_download(mw, server_usn, on_done)) def on_full_sync_timer(mw: aqt.main.AnkiQt, label: str) -> None: progress = mw.col.latest_progress() if not progress.HasField("full_sync"): return sync_progress = progress.full_sync if sync_progress.transferred == sync_progress.total: label = tr.sync_checking() mw.progress.update( value=sync_progress.transferred, max=sync_progress.total, process=False, label=label, ) if mw.progress.want_cancel(): mw.col.abort_sync() def full_download( mw: aqt.main.AnkiQt, server_usn: int, on_done: Callable[[], None] ) -> None: label = tr.sync_downloading_from_ankiweb() def on_timer() -> None: on_full_sync_timer(mw, label) timer = QTimer(mw) qconnect(timer.timeout, on_timer) timer.start(150) # hook needs to be called early, on the main thread gui_hooks.collection_will_temporarily_close(mw.col) def download() -> None: mw.create_backup_now() mw.col.close_for_full_sync() mw.col.full_upload_or_download( auth=mw.pm.sync_auth(), server_usn=server_usn, upload=False ) def on_future_done(fut: Future) -> None: timer.stop() mw.reopen(after_full_sync=True) mw.reset() try: fut.result() except Exception as err: handle_sync_error(mw, err) mw.media_syncer.start_monitoring() return on_done() mw.taskman.with_progress( download, on_future_done, ) def full_upload( mw: aqt.main.AnkiQt, server_usn: int | None, on_done: Callable[[], None] ) -> None: gui_hooks.collection_will_temporarily_close(mw.col) mw.col.close_for_full_sync() label = tr.sync_uploading_to_ankiweb() def on_timer() -> None: on_full_sync_timer(mw, label) timer = QTimer(mw) qconnect(timer.timeout, on_timer) timer.start(150) def on_future_done(fut: Future) -> None: timer.stop() mw.reopen(after_full_sync=True) mw.reset() try: fut.result() except Exception as err: handle_sync_error(mw, err) return on_done() mw.media_syncer.start_monitoring() return on_done() mw.taskman.with_progress( lambda: mw.col.full_upload_or_download( auth=mw.pm.sync_auth(), server_usn=server_usn, upload=True ), on_future_done, ) def sync_login( mw: aqt.main.AnkiQt, on_success: Callable[[], None], username: str = "", password: str = "", ) -> None: while True: (username, password) = get_id_and_pass_from_user(mw, username, password) if not username and not password: return if username and password: break def on_future_done(fut: Future[SyncAuth]) -> None: try: auth = fut.result() except SyncError as e: if e.kind is SyncErrorKind.AUTH: showWarning(str(e)) sync_login(mw, on_success, username, password) else: handle_sync_error(mw, e) return except Exception as err: handle_sync_error(mw, err) return mw.pm.set_sync_key(auth.hkey) mw.pm.set_sync_username(username) on_success() mw.taskman.with_progress( lambda: mw.col.sync_login( username=username, password=password, endpoint=mw.pm.sync_endpoint() ), on_future_done, ) def get_id_and_pass_from_user( mw: aqt.main.AnkiQt, username: str = "", password: str = "" ) -> tuple[str, str]: diag = QDialog(mw) diag.setWindowTitle("Anki") disable_help_button(diag) diag.setWindowModality(Qt.WindowModality.WindowModal) vbox = QVBoxLayout() info_label = QLabel( without_unicode_isolation( tr.sync_account_required(link="https://ankiweb.net/account/register") ) ) info_label.setOpenExternalLinks(True) info_label.setWordWrap(True) vbox.addWidget(info_label) vbox.addSpacing(20) g = QGridLayout() l1 = QLabel(tr.sync_ankiweb_id_label()) g.addWidget(l1, 0, 0) user = QLineEdit() user.setText(username) g.addWidget(user, 0, 1) l2 = QLabel(tr.sync_password_label()) g.addWidget(l2, 1, 0) passwd = QLineEdit() passwd.setText(password) passwd.setEchoMode(QLineEdit.EchoMode.Password) g.addWidget(passwd, 1, 1) vbox.addLayout(g) bb = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel) # type: ignore bb.button(QDialogButtonBox.StandardButton.Ok).setAutoDefault(True) qconnect(bb.accepted, diag.accept) qconnect(bb.rejected, diag.reject) vbox.addWidget(bb) diag.setLayout(vbox) diag.show() accepted = diag.exec() if not accepted: return ("", "") return (user.text().strip(), passwd.text()) # export platform version to syncing code os.environ["PLATFORM"] = plat_desc()