# Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html from __future__ import annotations import logging import os import re import sys import threading import time import traceback from dataclasses import dataclass from http import HTTPStatus import flask import flask_cors # type: ignore from flask import Response, request from waitress.server import create_server import aqt from anki import hooks from anki.collection import GraphPreferences, OpChanges from anki.decks import UpdateDeckConfigs from anki.models import NotetypeNames from anki.scheduler.v3 import NextStates from anki.utils import devMode, from_json_bytes from aqt.changenotetype import ChangeNotetypeDialog from aqt.deckoptions import DeckOptionsDialog from aqt.operations.deck import update_deck_configs from aqt.qt import * from aqt.utils import aqt_data_folder def _getExportFolder() -> str: if not (data_folder := os.getenv("ANKI_DATA_FOLDER")): data_folder = aqt_data_folder() webInSrcFolder = os.path.abspath(os.path.join(data_folder, "web")) if os.path.exists(webInSrcFolder): return webInSrcFolder elif isMac: dir = os.path.dirname(os.path.abspath(__file__)) return os.path.abspath(f"{dir}/../../Resources/web") else: if os.environ.get("TEST_TARGET"): # running tests in bazel; we have no data return "." else: raise Exception("couldn't find web folder") _exportFolder = _getExportFolder() app = flask.Flask(__name__) flask_cors.CORS(app) @dataclass class LocalFileRequest: # base folder, eg media folder root: str # path to file relative to root folder path: str @dataclass class NotFound: message: str DynamicRequest = Callable[[], Response] class MediaServer(threading.Thread): _ready = threading.Event() daemon = True def __init__(self, mw: aqt.main.AnkiQt) -> None: super().__init__() self.is_shutdown = False # map of webview ids to pages self._page_html: dict[int, str] = {} def run(self) -> None: try: if devMode: # idempotent if logging has already been set up logging.basicConfig() logging.getLogger("waitress").setLevel(logging.ERROR) desired_host = os.getenv("ANKI_API_HOST", "127.0.0.1") desired_port = int(os.getenv("ANKI_API_PORT", "0")) self.server = create_server( app, host=desired_host, port=desired_port, clear_untrusted_proxy_headers=True, ) if devMode: print( "Serving on http://%s:%s" % (self.server.effective_host, self.server.effective_port) # type: ignore ) self._ready.set() self.server.run() except Exception: if not self.is_shutdown: raise def shutdown(self) -> None: self.is_shutdown = True sockets = list(self.server._map.values()) # type: ignore for socket in sockets: socket.handle_close() # https://github.com/Pylons/webtest/blob/4b8a3ebf984185ff4fefb31b4d0cf82682e1fcf7/webtest/http.py#L93-L104 self.server.task_dispatcher.shutdown() def getPort(self) -> int: self._ready.wait() return int(self.server.effective_port) # type: ignore def set_page_html(self, id: int, html: str) -> None: self._page_html[id] = html def get_page_html(self, id: int) -> str | None: return self._page_html.get(id) def clear_page_html(self, id: int) -> None: try: del self._page_html[id] except KeyError: pass def _handle_local_file_request(request: LocalFileRequest) -> Response: directory = request.root path = request.path try: isdir = os.path.isdir(os.path.join(directory, path)) except ValueError: return flask.make_response( f"Path for '{directory} - {path}' is too long!", HTTPStatus.BAD_REQUEST, ) directory = os.path.realpath(directory) path = os.path.normpath(path) fullpath = os.path.abspath(os.path.join(directory, path)) # protect against directory transversal: https://security.openstack.org/guidelines/dg_using-file-paths.html if not fullpath.startswith(directory): return flask.make_response( f"Path for '{directory} - {path}' is a security leak!", HTTPStatus.FORBIDDEN, ) if isdir: return flask.make_response( f"Path for '{directory} - {path}' is a directory (not supported)!", HTTPStatus.FORBIDDEN, ) try: if fullpath.endswith(".css"): # some users may have invalid mime type in the Windows registry mimetype = "text/css" elif fullpath.endswith(".js"): mimetype = "application/javascript" else: # autodetect mimetype = None if os.path.exists(fullpath): return flask.send_file(fullpath, mimetype=mimetype, conditional=True) else: print(f"Not found: {path}") return flask.make_response( f"Invalid path: {path}", HTTPStatus.NOT_FOUND, ) except Exception as error: if devMode: print( "Caught HTTP server exception,\n%s" % "".join(traceback.format_exception(*sys.exc_info())), ) # swallow it - user likely surfed away from # review screen before an image had finished # downloading return flask.make_response( str(error), HTTPStatus.INTERNAL_SERVER_ERROR, ) @app.route("/", methods=["GET", "POST"]) def handle_request(pathin: str) -> Response: request = _extract_request(pathin) if devMode: print(f"{time.time():.3f} {flask.request.method} /{pathin}") if isinstance(request, NotFound): print(request.message) return flask.make_response( f"Invalid path: {pathin}", HTTPStatus.NOT_FOUND, ) elif callable(request): return _handle_dynamic_request(request) elif isinstance(request, LocalFileRequest): return _handle_local_file_request(request) else: return flask.make_response( f"unexpected request: {pathin}", HTTPStatus.FORBIDDEN, ) def _extract_internal_request( path: str, ) -> LocalFileRequest | DynamicRequest | NotFound | None: "Catch /_anki references and rewrite them to web export folder." prefix = "_anki/" if not path.startswith(prefix): return None dirname = os.path.dirname(path) filename = os.path.basename(path) additional_prefix = None if dirname == "_anki": if flask.request.method == "POST": return _extract_collection_post_request(filename) elif get_handler := _extract_dynamic_get_request(filename): return get_handler # remap legacy top-level references base, ext = os.path.splitext(filename) if ext == ".css": additional_prefix = "css/" elif ext == ".js": if base in ("browsersel", "jquery-ui", "jquery", "plot"): additional_prefix = "js/vendor/" else: additional_prefix = "js/" # handle requests for vendored libraries elif dirname == "_anki/js/vendor": base, ext = os.path.splitext(filename) if base == "jquery": base = "jquery.min" additional_prefix = "js/vendor/" elif base == "jquery-ui": base = "jquery-ui.min" additional_prefix = "js/vendor/" elif base == "browsersel": base = "css_browser_selector.min" additional_prefix = "js/vendor/" if additional_prefix: oldpath = path path = f"{prefix}{additional_prefix}{base}{ext}" print(f"legacy {oldpath} remapped to {path}") return LocalFileRequest(root=_exportFolder, path=path[len(prefix) :]) def _extract_addon_request(path: str) -> LocalFileRequest | NotFound | None: "Catch /_addons references and rewrite them to addons folder." prefix = "_addons/" if not path.startswith(prefix): return None addon_path = path[len(prefix) :] try: manager = aqt.mw.addonManager except AttributeError as error: if devMode: print(f"_redirectWebExports: {error}") return None try: addon, sub_path = addon_path.split("/", 1) except ValueError: return None if not addon: return None pattern = manager.getWebExports(addon) if not pattern: return None if re.fullmatch(pattern, sub_path): return LocalFileRequest(root=manager.addonsFolder(), path=addon_path) return NotFound(message=f"couldn't locate item in add-on folder {path}") def _extract_request(path: str) -> LocalFileRequest | DynamicRequest | NotFound: if internal := _extract_internal_request(path): return internal elif addon := _extract_addon_request(path): return addon if not aqt.mw.col: return NotFound(message=f"collection not open, ignore request for {path}") path = hooks.media_file_filter(path) return LocalFileRequest(root=aqt.mw.col.media.dir(), path=path) def graph_data() -> bytes: args = from_json_bytes(request.data) return aqt.mw.col.graph_data(search=args["search"], days=args["days"]) def graph_preferences() -> bytes: return aqt.mw.col.get_graph_preferences() def set_graph_preferences() -> None: prefs = GraphPreferences() prefs.ParseFromString(request.data) aqt.mw.col.set_graph_preferences(prefs) def congrats_info() -> bytes: if not aqt.mw.col.sched._is_finished(): aqt.mw.taskman.run_on_main(lambda: aqt.mw.moveToState("review")) return aqt.mw.col.congrats_info() def i18n_resources() -> bytes: args = from_json_bytes(request.data) return aqt.mw.col.i18n_resources(modules=args["modules"]) def deck_configs_for_update() -> bytes: args = from_json_bytes(request.data) msg = aqt.mw.col.decks.get_deck_configs_for_update(deck_id=args["deckId"]) msg.have_addons = aqt.mw.addonManager.dirty return msg.SerializeToString() def update_deck_configs_request() -> bytes: # the regular change tracking machinery expects to be started on the main # thread and uses a callback on success, so we need to run this op on # main, and return immediately from the web request input = UpdateDeckConfigs() input.ParseFromString(request.data) def on_success(changes: OpChanges) -> None: if isinstance(window := aqt.mw.app.activeWindow(), DeckOptionsDialog): window.reject() def handle_on_main() -> None: update_deck_configs(parent=aqt.mw, input=input).success( on_success ).run_in_background() aqt.mw.taskman.run_on_main(handle_on_main) return b"" def next_card_states() -> bytes: if states := aqt.mw.reviewer.get_next_states(): return states.SerializeToString() else: return b"" def set_next_card_states() -> bytes: key = request.headers.get("key", "") input = NextStates() input.ParseFromString(request.data) aqt.mw.reviewer.set_next_states(key, input) return b"" def notetype_names() -> bytes: msg = NotetypeNames(entries=aqt.mw.col.models.all_names_and_ids()) return msg.SerializeToString() def change_notetype_info() -> bytes: args = from_json_bytes(request.data) return aqt.mw.col.models.change_notetype_info( old_notetype_id=args["oldNotetypeId"], new_notetype_id=args["newNotetypeId"] ) def change_notetype() -> bytes: data = request.data def handle_on_main() -> None: window = aqt.mw.app.activeWindow() if isinstance(window, ChangeNotetypeDialog): window.save(data) aqt.mw.taskman.run_on_main(handle_on_main) return b"" def complete_tag() -> bytes: return aqt.mw.col.tags.complete_tag(request.data) # these require a collection post_handlers = { "graphData": graph_data, "graphPreferences": graph_preferences, "setGraphPreferences": set_graph_preferences, "deckConfigsForUpdate": deck_configs_for_update, "updateDeckConfigs": update_deck_configs_request, "nextCardStates": next_card_states, "setNextCardStates": set_next_card_states, "changeNotetypeInfo": change_notetype_info, "notetypeNames": notetype_names, "changeNotetype": change_notetype, "i18nResources": i18n_resources, "congratsInfo": congrats_info, "completeTag": complete_tag, } def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound: if not aqt.mw.col: return NotFound(message=f"collection not open, ignore request for {path}") if handler := post_handlers.get(path): # convert bytes/None into response def wrapped() -> Response: if data := handler(): response = flask.make_response(data) response.headers["Content-Type"] = "application/binary" else: response = flask.make_response("", HTTPStatus.NO_CONTENT) return response return wrapped else: return NotFound(message=f"{path} not found") def _handle_dynamic_request(request: DynamicRequest) -> Response: try: return request() except Exception as e: return flask.make_response(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) def legacy_page_data() -> Response: id = int(request.args["id"]) if html := aqt.mw.mediaServer.get_page_html(id): return Response(html, mimetype="text/html") else: return flask.make_response("page not found", HTTPStatus.NOT_FOUND) # this currently only handles a single method; in the future, idempotent # requests like i18nResources should probably be moved here def _extract_dynamic_get_request(path: str) -> DynamicRequest | None: if path == "legacyPageData": return legacy_page_data else: return None