anki/qt/aqt/mediasrv.py
Damien Elmes d33c66e195 change setHtml() to serve content via media server
- fixes https://forums.ankiweb.net/t/deck-list-is-blank/2241/2
- fixes the security warnings on Qt 6, by ensuring our pages and
resources are coming from the same origin
2021-10-12 16:27:03 +10:00

472 lines
14 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import logging
import os
import re
import sys
import threading
import time
import traceback
from dataclasses import dataclass
from http import HTTPStatus
import flask
import flask_cors # type: ignore
from flask import Response, request
from waitress.server import create_server
import aqt
from anki import hooks
from anki.collection import GraphPreferences, OpChanges
from anki.decks import UpdateDeckConfigs
from anki.models import NotetypeNames
from anki.scheduler.v3 import NextStates
from anki.utils import devMode, from_json_bytes
from aqt.changenotetype import ChangeNotetypeDialog
from aqt.deckoptions import DeckOptionsDialog
from aqt.operations.deck import update_deck_configs
from aqt.qt import *
from aqt.utils import aqt_data_folder
def _getExportFolder() -> str:
if not (data_folder := os.getenv("ANKI_DATA_FOLDER")):
data_folder = aqt_data_folder()
webInSrcFolder = os.path.abspath(os.path.join(data_folder, "web"))
if os.path.exists(webInSrcFolder):
return webInSrcFolder
elif isMac:
dir = os.path.dirname(os.path.abspath(__file__))
return os.path.abspath(f"{dir}/../../Resources/web")
else:
if os.environ.get("TEST_TARGET"):
# running tests in bazel; we have no data
return "."
else:
raise Exception("couldn't find web folder")
_exportFolder = _getExportFolder()
app = flask.Flask(__name__)
flask_cors.CORS(app)
@dataclass
class LocalFileRequest:
# base folder, eg media folder
root: str
# path to file relative to root folder
path: str
@dataclass
class NotFound:
message: str
DynamicRequest = Callable[[], Response]
class MediaServer(threading.Thread):
_ready = threading.Event()
daemon = True
def __init__(self, mw: aqt.main.AnkiQt) -> None:
super().__init__()
self.is_shutdown = False
# map of webview ids to pages
self._page_html: dict[int, str] = {}
def run(self) -> None:
try:
if devMode:
# idempotent if logging has already been set up
logging.basicConfig()
logging.getLogger("waitress").setLevel(logging.ERROR)
desired_host = os.getenv("ANKI_API_HOST", "127.0.0.1")
desired_port = int(os.getenv("ANKI_API_PORT", "0"))
self.server = create_server(
app,
host=desired_host,
port=desired_port,
clear_untrusted_proxy_headers=True,
)
if devMode:
print(
"Serving on http://%s:%s"
% (self.server.effective_host, self.server.effective_port) # type: ignore
)
self._ready.set()
self.server.run()
except Exception:
if not self.is_shutdown:
raise
def shutdown(self) -> None:
self.is_shutdown = True
sockets = list(self.server._map.values()) # type: ignore
for socket in sockets:
socket.handle_close()
# https://github.com/Pylons/webtest/blob/4b8a3ebf984185ff4fefb31b4d0cf82682e1fcf7/webtest/http.py#L93-L104
self.server.task_dispatcher.shutdown()
def getPort(self) -> int:
self._ready.wait()
return int(self.server.effective_port) # type: ignore
def set_page_html(self, id: int, html: str) -> None:
self._page_html[id] = html
def get_page_html(self, id: int) -> str | None:
return self._page_html.get(id)
def clear_page_html(self, id: int) -> None:
try:
del self._page_html[id]
except KeyError:
pass
def _handle_local_file_request(request: LocalFileRequest) -> Response:
directory = request.root
path = request.path
try:
isdir = os.path.isdir(os.path.join(directory, path))
except ValueError:
return flask.make_response(
f"Path for '{directory} - {path}' is too long!",
HTTPStatus.BAD_REQUEST,
)
directory = os.path.realpath(directory)
path = os.path.normpath(path)
fullpath = os.path.abspath(os.path.join(directory, path))
# protect against directory transversal: https://security.openstack.org/guidelines/dg_using-file-paths.html
if not fullpath.startswith(directory):
return flask.make_response(
f"Path for '{directory} - {path}' is a security leak!",
HTTPStatus.FORBIDDEN,
)
if isdir:
return flask.make_response(
f"Path for '{directory} - {path}' is a directory (not supported)!",
HTTPStatus.FORBIDDEN,
)
try:
if fullpath.endswith(".css"):
# some users may have invalid mime type in the Windows registry
mimetype = "text/css"
elif fullpath.endswith(".js"):
mimetype = "application/javascript"
else:
# autodetect
mimetype = None
if os.path.exists(fullpath):
return flask.send_file(fullpath, mimetype=mimetype, conditional=True)
else:
print(f"Not found: {path}")
return flask.make_response(
f"Invalid path: {path}",
HTTPStatus.NOT_FOUND,
)
except Exception as error:
if devMode:
print(
"Caught HTTP server exception,\n%s"
% "".join(traceback.format_exception(*sys.exc_info())),
)
# swallow it - user likely surfed away from
# review screen before an image had finished
# downloading
return flask.make_response(
str(error),
HTTPStatus.INTERNAL_SERVER_ERROR,
)
@app.route("/<path:pathin>", methods=["GET", "POST"])
def handle_request(pathin: str) -> Response:
request = _extract_request(pathin)
if devMode:
print(f"{time.time():.3f} {flask.request.method} /{pathin}")
if isinstance(request, NotFound):
print(request.message)
return flask.make_response(
f"Invalid path: {pathin}",
HTTPStatus.NOT_FOUND,
)
elif callable(request):
return _handle_dynamic_request(request)
elif isinstance(request, LocalFileRequest):
return _handle_local_file_request(request)
else:
return flask.make_response(
f"unexpected request: {pathin}",
HTTPStatus.FORBIDDEN,
)
def _extract_internal_request(
path: str,
) -> LocalFileRequest | DynamicRequest | NotFound | None:
"Catch /_anki references and rewrite them to web export folder."
prefix = "_anki/"
if not path.startswith(prefix):
return None
dirname = os.path.dirname(path)
filename = os.path.basename(path)
additional_prefix = None
if dirname == "_anki":
if flask.request.method == "POST":
return _extract_collection_post_request(filename)
elif get_handler := _extract_dynamic_get_request(filename):
return get_handler
# remap legacy top-level references
base, ext = os.path.splitext(filename)
if ext == ".css":
additional_prefix = "css/"
elif ext == ".js":
if base in ("browsersel", "jquery-ui", "jquery", "plot"):
additional_prefix = "js/vendor/"
else:
additional_prefix = "js/"
# handle requests for vendored libraries
elif dirname == "_anki/js/vendor":
base, ext = os.path.splitext(filename)
if base == "jquery":
base = "jquery.min"
additional_prefix = "js/vendor/"
elif base == "jquery-ui":
base = "jquery-ui.min"
additional_prefix = "js/vendor/"
elif base == "browsersel":
base = "css_browser_selector.min"
additional_prefix = "js/vendor/"
if additional_prefix:
oldpath = path
path = f"{prefix}{additional_prefix}{base}{ext}"
print(f"legacy {oldpath} remapped to {path}")
return LocalFileRequest(root=_exportFolder, path=path[len(prefix) :])
def _extract_addon_request(path: str) -> LocalFileRequest | NotFound | None:
"Catch /_addons references and rewrite them to addons folder."
prefix = "_addons/"
if not path.startswith(prefix):
return None
addon_path = path[len(prefix) :]
try:
manager = aqt.mw.addonManager
except AttributeError as error:
if devMode:
print(f"_redirectWebExports: {error}")
return None
try:
addon, sub_path = addon_path.split("/", 1)
except ValueError:
return None
if not addon:
return None
pattern = manager.getWebExports(addon)
if not pattern:
return None
if re.fullmatch(pattern, sub_path):
return LocalFileRequest(root=manager.addonsFolder(), path=addon_path)
return NotFound(message=f"couldn't locate item in add-on folder {path}")
def _extract_request(path: str) -> LocalFileRequest | DynamicRequest | NotFound:
if internal := _extract_internal_request(path):
return internal
elif addon := _extract_addon_request(path):
return addon
if not aqt.mw.col:
return NotFound(message=f"collection not open, ignore request for {path}")
path = hooks.media_file_filter(path)
return LocalFileRequest(root=aqt.mw.col.media.dir(), path=path)
def graph_data() -> bytes:
args = from_json_bytes(request.data)
return aqt.mw.col.graph_data(search=args["search"], days=args["days"])
def graph_preferences() -> bytes:
return aqt.mw.col.get_graph_preferences()
def set_graph_preferences() -> None:
prefs = GraphPreferences()
prefs.ParseFromString(request.data)
aqt.mw.col.set_graph_preferences(prefs)
def congrats_info() -> bytes:
if not aqt.mw.col.sched._is_finished():
aqt.mw.taskman.run_on_main(lambda: aqt.mw.moveToState("review"))
return aqt.mw.col.congrats_info()
def i18n_resources() -> bytes:
args = from_json_bytes(request.data)
return aqt.mw.col.i18n_resources(modules=args["modules"])
def deck_configs_for_update() -> bytes:
args = from_json_bytes(request.data)
msg = aqt.mw.col.decks.get_deck_configs_for_update(deck_id=args["deckId"])
msg.have_addons = aqt.mw.addonManager.dirty
return msg.SerializeToString()
def update_deck_configs_request() -> bytes:
# the regular change tracking machinery expects to be started on the main
# thread and uses a callback on success, so we need to run this op on
# main, and return immediately from the web request
input = UpdateDeckConfigs()
input.ParseFromString(request.data)
def on_success(changes: OpChanges) -> None:
if isinstance(window := aqt.mw.app.activeWindow(), DeckOptionsDialog):
window.reject()
def handle_on_main() -> None:
update_deck_configs(parent=aqt.mw, input=input).success(
on_success
).run_in_background()
aqt.mw.taskman.run_on_main(handle_on_main)
return b""
def next_card_states() -> bytes:
if states := aqt.mw.reviewer.get_next_states():
return states.SerializeToString()
else:
return b""
def set_next_card_states() -> bytes:
key = request.headers.get("key", "")
input = NextStates()
input.ParseFromString(request.data)
aqt.mw.reviewer.set_next_states(key, input)
return b""
def notetype_names() -> bytes:
msg = NotetypeNames(entries=aqt.mw.col.models.all_names_and_ids())
return msg.SerializeToString()
def change_notetype_info() -> bytes:
args = from_json_bytes(request.data)
return aqt.mw.col.models.change_notetype_info(
old_notetype_id=args["oldNotetypeId"], new_notetype_id=args["newNotetypeId"]
)
def change_notetype() -> bytes:
data = request.data
def handle_on_main() -> None:
window = aqt.mw.app.activeWindow()
if isinstance(window, ChangeNotetypeDialog):
window.save(data)
aqt.mw.taskman.run_on_main(handle_on_main)
return b""
def complete_tag() -> bytes:
return aqt.mw.col.tags.complete_tag(request.data)
# these require a collection
post_handlers = {
"graphData": graph_data,
"graphPreferences": graph_preferences,
"setGraphPreferences": set_graph_preferences,
"deckConfigsForUpdate": deck_configs_for_update,
"updateDeckConfigs": update_deck_configs_request,
"nextCardStates": next_card_states,
"setNextCardStates": set_next_card_states,
"changeNotetypeInfo": change_notetype_info,
"notetypeNames": notetype_names,
"changeNotetype": change_notetype,
"i18nResources": i18n_resources,
"congratsInfo": congrats_info,
"completeTag": complete_tag,
}
def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound:
if not aqt.mw.col:
return NotFound(message=f"collection not open, ignore request for {path}")
if handler := post_handlers.get(path):
# convert bytes/None into response
def wrapped() -> Response:
if data := handler():
response = flask.make_response(data)
response.headers["Content-Type"] = "application/binary"
else:
response = flask.make_response("", HTTPStatus.NO_CONTENT)
return response
return wrapped
else:
return NotFound(message=f"{path} not found")
def _handle_dynamic_request(request: DynamicRequest) -> Response:
try:
return request()
except Exception as e:
return flask.make_response(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
def legacy_page_data() -> Response:
id = int(request.args["id"])
if html := aqt.mw.mediaServer.get_page_html(id):
return Response(html, mimetype="text/html")
else:
return flask.make_response("page not found", HTTPStatus.NOT_FOUND)
# this currently only handles a single method; in the future, idempotent
# requests like i18nResources should probably be moved here
def _extract_dynamic_get_request(path: str) -> DynamicRequest | None:
if path == "legacyPageData":
return legacy_page_data
else:
return None