# Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html from __future__ import annotations import enum import logging import mimetypes import os import re import sys import threading import time import traceback from dataclasses import dataclass from http import HTTPStatus from typing import Callable import flask import flask_cors import stringcase from flask import Response, abort, request from waitress.server import create_server import aqt import aqt.main import aqt.operations from anki import hooks from anki.collection import OpChanges, OpChangesOnly, Progress, SearchNode from anki.decks import UpdateDeckConfigs from anki.scheduler.v3 import SchedulingStatesWithContext, SetSchedulingStatesRequest from anki.utils import dev_mode from aqt.changenotetype import ChangeNotetypeDialog from aqt.deckoptions import DeckOptionsDialog from aqt.operations import on_op_finished from aqt.operations.deck import update_deck_configs as update_deck_configs_op from aqt.progress import ProgressUpdate from aqt.qt import * from aqt.utils import aqt_data_path, show_warning, tr app = flask.Flask(__name__, root_path="/fake") flask_cors.CORS(app, resources={r"/*": {"origins": "127.0.0.1"}}) @dataclass class LocalFileRequest: # base folder, eg media folder root: str # path to file relative to root folder path: str @dataclass class BundledFileRequest: # path relative to aqt data folder path: str @dataclass class NotFound: message: str DynamicRequest = Callable[[], Response] class PageContext(enum.Enum): UNKNOWN = 0 EDITOR = 1 REVIEWER = 2 # something in /_anki/pages/ NON_LEGACY_PAGE = 3 # Do not use this if you present user content (e.g. content from cards), as it's a # security issue. ADDON_PAGE = 4 @dataclass class LegacyPage: html: str context: PageContext class MediaServer(threading.Thread): _ready = threading.Event() daemon = True def __init__(self, mw: aqt.main.AnkiQt) -> None: super().__init__() self.is_shutdown = False # map of webview ids to pages self._legacy_pages: dict[int, LegacyPage] = {} def run(self) -> None: try: if dev_mode: # idempotent if logging has already been set up logging.basicConfig() logging.getLogger("waitress").setLevel(logging.ERROR) desired_host = os.getenv("ANKI_API_HOST", "127.0.0.1") desired_port = int(os.getenv("ANKI_API_PORT") or 0) self.server = create_server( app, host=desired_host, port=desired_port, clear_untrusted_proxy_headers=True, ) if dev_mode: print( "Serving on http://%s:%s" % (self.server.effective_host, self.server.effective_port) # type: ignore ) self._ready.set() self.server.run() except Exception: if not self.is_shutdown: raise def shutdown(self) -> None: self.is_shutdown = True sockets = list(self.server._map.values()) # type: ignore for socket in sockets: socket.handle_close() # https://github.com/Pylons/webtest/blob/4b8a3ebf984185ff4fefb31b4d0cf82682e1fcf7/webtest/http.py#L93-L104 self.server.task_dispatcher.shutdown() def getPort(self) -> int: self._ready.wait() return int(self.server.effective_port) # type: ignore def set_page_html( self, id: int, html: str, context: PageContext = PageContext.UNKNOWN ) -> None: self._legacy_pages[id] = LegacyPage(html, context) def get_page_html(self, id: int) -> str | None: if page := self._legacy_pages.get(id): return page.html else: return None def get_page_context(self, id: int) -> PageContext | None: if page := self._legacy_pages.get(id): return page.context else: return None def clear_page_html(self, id: int) -> None: try: del self._legacy_pages[id] except KeyError: pass @app.route("/favicon.ico") def favicon() -> Response: request = BundledFileRequest(os.path.join("imgs", "favicon.ico")) return _handle_builtin_file_request(request) def _mime_for_path(path: str) -> str: "Mime type for provided path/filename." if path.endswith(".css"): # some users may have invalid mime type in the Windows registry return "text/css" elif path.endswith(".js"): return "application/javascript" else: # autodetect mime, _encoding = mimetypes.guess_type(path) return mime or "application/octet-stream" def _handle_local_file_request(request: LocalFileRequest) -> Response: directory = request.root path = request.path try: isdir = os.path.isdir(os.path.join(directory, path)) except ValueError: return flask.make_response( f"Path for '{directory} - {path}' is too long!", HTTPStatus.BAD_REQUEST, ) directory = os.path.realpath(directory) path = os.path.normpath(path) fullpath = os.path.abspath(os.path.join(directory, path)) # protect against directory transversal: https://security.openstack.org/guidelines/dg_using-file-paths.html if not fullpath.startswith(directory): return flask.make_response( f"Path for '{directory} - {path}' is a security leak!", HTTPStatus.FORBIDDEN, ) if isdir: return flask.make_response( f"Path for '{directory} - {path}' is a directory (not supported)!", HTTPStatus.FORBIDDEN, ) try: mimetype = _mime_for_path(fullpath) if os.path.exists(fullpath): if fullpath.endswith(".css"): # caching css files prevents flicker in the webview, but we want # a short cache max_age = 10 elif fullpath.endswith(".js"): # don't cache js files max_age = 0 else: max_age = 60 * 60 return flask.send_file( fullpath, mimetype=mimetype, conditional=True, max_age=max_age, download_name="foo" # type: ignore[call-arg] ) else: print(f"Not found: {path}") return flask.make_response( f"Invalid path: {path}", HTTPStatus.NOT_FOUND, ) except Exception as error: if dev_mode: print( "Caught HTTP server exception,\n%s" % "".join(traceback.format_exception(*sys.exc_info())), ) # swallow it - user likely surfed away from # review screen before an image had finished # downloading return flask.make_response( str(error), HTTPStatus.INTERNAL_SERVER_ERROR, ) def _builtin_data(path: str) -> bytes: """Return data from file in aqt/data folder. Path must use forward slash separators.""" # packaged build? if getattr(sys, "frozen", False): reader = aqt.__loader__.get_resource_reader("_aqt") # type: ignore with reader.open_resource(path) as f: return f.read() else: full_path = aqt_data_path() / ".." / path return full_path.read_bytes() def _handle_builtin_file_request(request: BundledFileRequest) -> Response: path = request.path mimetype = _mime_for_path(path) data_path = f"data/web/{path}" try: data = _builtin_data(data_path) return Response(data, mimetype=mimetype) except FileNotFoundError: if dev_mode: print(f"404: {data_path}") return flask.make_response( f"Invalid path: {path}", HTTPStatus.NOT_FOUND, ) except Exception as error: if dev_mode: print( "Caught HTTP server exception,\n%s" % "".join(traceback.format_exception(*sys.exc_info())), ) # swallow it - user likely surfed away from # review screen before an image had finished # downloading return flask.make_response( str(error), HTTPStatus.INTERNAL_SERVER_ERROR, ) @app.route("/", methods=["GET", "POST"]) def handle_request(pathin: str) -> Response: host = request.headers.get("Host", "").lower() allowed_prefixes = ("127.0.0.1:", "localhost:", "[::1]:") if not any(host.startswith(prefix) for prefix in allowed_prefixes): # while we only bind to localhost, this request may have come from a local browser # via a DNS rebinding attack; deny it unless we're doing non-local testing if os.environ.get("ANKI_API_HOST") != "0.0.0.0": print("deny non-local host", host) abort(403) req = _extract_request(pathin) if dev_mode: print(f"{time.time():.3f} {flask.request.method} /{pathin}") if isinstance(req, NotFound): print(req.message) return flask.make_response( f"Invalid path: {pathin}", HTTPStatus.NOT_FOUND, ) elif callable(req): return _handle_dynamic_request(req) elif isinstance(req, BundledFileRequest): return _handle_builtin_file_request(req) elif isinstance(req, LocalFileRequest): return _handle_local_file_request(req) else: return flask.make_response( f"unexpected request: {pathin}", HTTPStatus.FORBIDDEN, ) def _extract_internal_request( path: str, ) -> BundledFileRequest | DynamicRequest | NotFound | None: "Catch /_anki references and rewrite them to web export folder." prefix = "_anki/" if not path.startswith(prefix): return None dirname = os.path.dirname(path) filename = os.path.basename(path) additional_prefix = None if dirname == "_anki": if flask.request.method == "POST": return _extract_collection_post_request(filename) elif get_handler := _extract_dynamic_get_request(filename): return get_handler # remap legacy top-level references base, ext = os.path.splitext(filename) if ext == ".css": additional_prefix = "css/" elif ext == ".js": if base in ("jquery-ui", "jquery", "plot"): additional_prefix = "js/vendor/" else: additional_prefix = "js/" # handle requests for vendored libraries elif dirname == "_anki/js/vendor": base, ext = os.path.splitext(filename) if base == "jquery": base = "jquery.min" additional_prefix = "js/vendor/" elif base == "jquery-ui": base = "jquery-ui.min" additional_prefix = "js/vendor/" if additional_prefix: oldpath = path path = f"{prefix}{additional_prefix}{base}{ext}" print(f"legacy {oldpath} remapped to {path}") return BundledFileRequest(path=path[len(prefix) :]) def _extract_addon_request(path: str) -> LocalFileRequest | NotFound | None: "Catch /_addons references and rewrite them to addons folder." prefix = "_addons/" if not path.startswith(prefix): return None addon_path = path[len(prefix) :] try: manager = aqt.mw.addonManager except AttributeError as error: if dev_mode: print(f"_redirectWebExports: {error}") return None try: addon, sub_path = addon_path.split("/", 1) except ValueError: return None if not addon: return None pattern = manager.getWebExports(addon) if not pattern: return None if re.fullmatch(pattern, sub_path): return LocalFileRequest(root=manager.addonsFolder(), path=addon_path) return NotFound(message=f"couldn't locate item in add-on folder {path}") def _extract_request( path: str, ) -> LocalFileRequest | BundledFileRequest | DynamicRequest | NotFound: if internal := _extract_internal_request(path): return internal elif addon := _extract_addon_request(path): return addon if not aqt.mw.col: return NotFound(message=f"collection not open, ignore request for {path}") path = hooks.media_file_filter(path) return LocalFileRequest(root=aqt.mw.col.media.dir(), path=path) def congrats_info() -> bytes: if not aqt.mw.col.sched._is_finished(): aqt.mw.taskman.run_on_main(lambda: aqt.mw.moveToState("review")) return raw_backend_request("congrats_info")() def get_deck_configs_for_update() -> bytes: return aqt.mw.col._backend.get_deck_configs_for_update_raw(request.data) def update_deck_configs() -> bytes: # the regular change tracking machinery expects to be started on the main # thread and uses a callback on success, so we need to run this op on # main, and return immediately from the web request input = UpdateDeckConfigs() input.ParseFromString(request.data) def on_progress(progress: Progress, update: ProgressUpdate) -> None: if progress.HasField("compute_memory"): val = progress.compute_memory update.max = val.total_cards update.value = val.current_cards update.label = val.label elif progress.HasField("compute_weights"): val2 = progress.compute_weights update.max = val2.total update.value = val2.current pct = str(int(val2.current / val2.total * 100) if val2.total > 0 else 0) label = tr.deck_config_optimizing_preset( current_count=val2.current_preset, total_count=val2.total_presets ) update.label = ( label + "\n" + tr.deck_config_percent_of_reviews(pct=pct, reviews=val2.fsrs_items) ) else: return if update.user_wants_abort: update.abort = True def on_success(changes: OpChanges) -> None: if isinstance(window := aqt.mw.app.activeWindow(), DeckOptionsDialog): window.reject() def handle_on_main() -> None: update_deck_configs_op(parent=aqt.mw, input=input).success( on_success ).with_backend_progress(on_progress).run_in_background() aqt.mw.taskman.run_on_main(handle_on_main) return b"" def get_scheduling_states_with_context() -> bytes: return SchedulingStatesWithContext( states=aqt.mw.reviewer.get_scheduling_states(), context=aqt.mw.reviewer.get_scheduling_context(), ).SerializeToString() def set_scheduling_states() -> bytes: states = SetSchedulingStatesRequest() states.ParseFromString(request.data) aqt.mw.reviewer.set_scheduling_states(states) return b"" def import_done() -> bytes: def update_window_modality() -> None: if window := aqt.mw.app.activeWindow(): from aqt.import_export.import_dialog import ImportDialog if isinstance(window, ImportDialog): window.hide() window.setWindowModality(Qt.WindowModality.NonModal) window.show() aqt.mw.taskman.run_on_main(update_window_modality) return b"" def import_request(endpoint: str) -> bytes: output = raw_backend_request(endpoint)() response = OpChangesOnly() response.ParseFromString(output) def handle_on_main() -> None: window = aqt.mw.app.activeWindow() on_op_finished(aqt.mw, response, window) aqt.mw.taskman.run_on_main(handle_on_main) return output def import_csv() -> bytes: return import_request("import_csv") def import_anki_package() -> bytes: return import_request("import_anki_package") def import_json_file() -> bytes: return import_request("import_json_file") def import_json_string() -> bytes: return import_request("import_json_string") def search_in_browser() -> bytes: node = SearchNode() node.ParseFromString(request.data) def handle_on_main() -> None: aqt.dialogs.open("Browser", aqt.mw, search=(node,)) aqt.mw.taskman.run_on_main(handle_on_main) return b"" def change_notetype() -> bytes: data = request.data def handle_on_main() -> None: window = aqt.mw.app.activeWindow() if isinstance(window, ChangeNotetypeDialog): window.save(data) aqt.mw.taskman.run_on_main(handle_on_main) return b"" post_handler_list = [ congrats_info, get_deck_configs_for_update, update_deck_configs, get_scheduling_states_with_context, set_scheduling_states, change_notetype, import_done, import_csv, import_anki_package, import_json_file, import_json_string, search_in_browser, ] exposed_backend_list = [ # CollectionService "latest_progress", # DeckService "get_deck_names", # I18nService "i18n_resources", # ImportExportService "get_csv_metadata", "get_import_anki_package_presets", # NotesService "get_field_names", "get_note", # NotetypesService "get_notetype_names", "get_change_notetype_info", # StatsService "card_stats", "graphs", "get_graph_preferences", "set_graph_preferences", # TagsService "complete_tag", # ImageOcclusionService "get_image_for_occlusion", "add_image_occlusion_note", "get_image_occlusion_note", "update_image_occlusion_note", "get_image_occlusion_fields", # SchedulerService "compute_fsrs_weights", "compute_optimal_retention", "set_wants_abort", "evaluate_weights", "get_optimal_retention_parameters", ] def raw_backend_request(endpoint: str) -> Callable[[], bytes]: # check for key at startup from anki._backend import RustBackend assert hasattr(RustBackend, f"{endpoint}_raw") return lambda: getattr(aqt.mw.col._backend, f"{endpoint}_raw")(request.data) # all methods in here require a collection post_handlers = { stringcase.camelcase(handler.__name__): handler for handler in post_handler_list } | { stringcase.camelcase(handler): raw_backend_request(handler) for handler in exposed_backend_list } def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound: if not aqt.mw.col: return NotFound(message=f"collection not open, ignore request for {path}") if handler := post_handlers.get(path): # convert bytes/None into response def wrapped() -> Response: try: if data := handler(): response = flask.make_response(data) response.headers["Content-Type"] = "application/binary" else: response = flask.make_response("", HTTPStatus.NO_CONTENT) except Exception as exc: print(traceback.format_exc()) response = flask.make_response( str(exc), HTTPStatus.INTERNAL_SERVER_ERROR ) return response return wrapped else: return NotFound(message=f"{path} not found") def _check_dynamic_request_permissions(): if request.method == "GET": return context = _extract_page_context() def warn() -> None: show_warning( "Unexpected API access. Please report this message on the Anki forums." ) # check content type header to ensure this isn't an opaque request from another origin if request.headers["Content-type"] != "application/binary": aqt.mw.taskman.run_on_main(warn) abort(403) if ( context == PageContext.NON_LEGACY_PAGE or context == PageContext.EDITOR or context == PageContext.ADDON_PAGE ): pass elif context == PageContext.REVIEWER and request.path in ( "/_anki/getSchedulingStatesWithContext", "/_anki/setSchedulingStates", ): # reviewer is only allowed to access custom study methods pass else: # other legacy pages may contain third-party JS, so we do not # allow them to access our API aqt.mw.taskman.run_on_main(warn) abort(403) def _handle_dynamic_request(req: DynamicRequest) -> Response: _check_dynamic_request_permissions() try: return req() except Exception as e: return flask.make_response(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) def legacy_page_data() -> Response: id = int(request.args["id"]) if html := aqt.mw.mediaServer.get_page_html(id): return Response(html, mimetype="text/html") else: return flask.make_response("page not found", HTTPStatus.NOT_FOUND) def _extract_page_context() -> PageContext: "Get context based on referer header." from urllib.parse import parse_qs, urlparse referer = urlparse(request.headers.get("Referer", "")) if referer.path.startswith("/_anki/pages/"): return PageContext.NON_LEGACY_PAGE elif referer.path == "/_anki/legacyPageData": query_params = parse_qs(referer.query) id = int(query_params.get("id", [None])[0]) return aqt.mw.mediaServer.get_page_context(id) else: return PageContext.UNKNOWN # this currently only handles a single method; in the future, idempotent # requests like i18nResources should probably be moved here def _extract_dynamic_get_request(path: str) -> DynamicRequest | None: if path == "legacyPageData": return legacy_page_data else: return None