anki/qt/aqt/addons.py
Damien Elmes 6134ae9ec6 refactor add-on downloading/installing/updating
- web requests done on a background thread
- easier to use outside of the addon dialog
- gets max point version info from AnkiWeb, which we can use in
the future
2020-01-19 10:37:15 +10:00

1151 lines
34 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# -*- coding: utf-8 -*-
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import io
import json
import os
import re
import zipfile
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass
from typing import IO, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from zipfile import ZipFile
import jsonschema
import markdown
from jsonschema.exceptions import ValidationError
from send2trash import send2trash
import aqt
import aqt.forms
from anki.httpclient import AnkiRequestsClient
from anki.lang import _, ngettext
from anki.utils import intTime
from aqt.qt import *
from aqt.utils import (
askUser,
getFile,
isWin,
openFolder,
openLink,
restoreGeom,
restoreSplitter,
saveGeom,
saveSplitter,
showInfo,
showWarning,
tooltip,
)
@dataclass
class InstallOk:
name: str
conflicts: List[str]
@dataclass
class InstallError:
errmsg: str
@dataclass
class DownloadOk:
data: bytes
filename: str
@dataclass
class DownloadError:
# set if result was not 200
status_code: Optional[int] = None
# set if an exception occurred
exception: Optional[Exception] = None
@dataclass
class UpdateInfo:
id: int
last_updated: int
max_point_version: Optional[int]
# first arg is add-on id
DownloadLogEntry = Tuple[int, Union[DownloadError, InstallError, InstallOk]]
# fixme: this class should not have any GUI code in it
class AddonManager:
ext: str = ".ankiaddon"
_manifest_schema: dict = {
"type": "object",
"properties": {
"package": {"type": "string", "meta": False},
"name": {"type": "string", "meta": True},
"mod": {"type": "number", "meta": True},
"conflicts": {"type": "array", "items": {"type": "string"}, "meta": True},
},
"required": ["package", "name"],
}
def __init__(self, mw: aqt.main.AnkiQt):
self.mw = mw
self.dirty = False
f = self.mw.form
f.actionAdd_ons.triggered.connect(self.onAddonsDialog)
sys.path.insert(0, self.addonsFolder())
def allAddons(self):
l = []
for d in os.listdir(self.addonsFolder()):
path = self.addonsFolder(d)
if not os.path.exists(os.path.join(path, "__init__.py")):
continue
l.append(d)
l.sort()
if os.getenv("ANKIREVADDONS", ""):
l = reversed(l)
return l
def managedAddons(self):
return [d for d in self.allAddons() if re.match(r"^\d+$", d)]
def addonsFolder(self, dir=None):
root = self.mw.pm.addonFolder()
if not dir:
return root
return os.path.join(root, dir)
def loadAddons(self):
for dir in self.allAddons():
meta = self.addonMeta(dir)
if meta.get("disabled"):
continue
self.dirty = True
try:
__import__(dir)
except:
showWarning(
_(
"""\
An add-on you installed failed to load. If problems persist, please \
go to the Tools>Add-ons menu, and disable or delete the add-on.
When loading '%(name)s':
%(traceback)s
"""
)
% dict(name=meta.get("name", dir), traceback=traceback.format_exc())
)
def onAddonsDialog(self):
AddonsDialog(self)
# Metadata
######################################################################
def _addonMetaPath(self, dir):
return os.path.join(self.addonsFolder(dir), "meta.json")
def addonMeta(self, dir: str) -> Dict[str, Any]:
path = self._addonMetaPath(dir)
try:
with open(path, encoding="utf8") as f:
return json.load(f)
except:
return dict()
def writeAddonMeta(self, dir, meta):
path = self._addonMetaPath(dir)
with open(path, "w", encoding="utf8") as f:
json.dump(meta, f)
def isEnabled(self, dir):
meta = self.addonMeta(dir)
return not meta.get("disabled")
def toggleEnabled(self, dir, enable=None):
meta = self.addonMeta(dir)
enabled = enable if enable is not None else meta.get("disabled")
if enabled is True:
conflicting = self._disableConflicting(dir)
if conflicting:
addons = ", ".join(self.addonName(f) for f in conflicting)
showInfo(
_(
"The following add-ons are incompatible with %(name)s \
and have been disabled: %(found)s"
)
% dict(name=self.addonName(dir), found=addons),
textFormat="plain",
)
meta["disabled"] = not enabled
self.writeAddonMeta(dir, meta)
def addonName(self, dir):
return self.addonMeta(dir).get("name", dir)
def annotatedName(self, dir):
buf = self.addonName(dir)
if not self.isEnabled(dir):
buf += _(" (disabled)")
return buf
def enabled_addon_ids(self) -> List[int]:
ids = []
for dir in self.managedAddons():
meta = self.addonMeta(dir)
if not meta.get("disabled"):
ids.append(int(dir))
return ids
# Conflict resolution
######################################################################
def addonConflicts(self, dir):
return self.addonMeta(dir).get("conflicts", [])
def allAddonConflicts(self):
all_conflicts = defaultdict(list)
for dir in self.allAddons():
if not self.isEnabled(dir):
continue
conflicts = self.addonConflicts(dir)
for other_dir in conflicts:
all_conflicts[other_dir].append(dir)
return all_conflicts
def _disableConflicting(self, dir, conflicts=None):
conflicts = conflicts or self.addonConflicts(dir)
installed = self.allAddons()
found = [d for d in conflicts if d in installed and self.isEnabled(d)]
found.extend(self.allAddonConflicts().get(dir, []))
if not found:
return []
for package in found:
self.toggleEnabled(package, enable=False)
return found
# Installing and deleting add-ons
######################################################################
def readManifestFile(self, zfile):
try:
with zfile.open("manifest.json") as f:
data = json.loads(f.read())
jsonschema.validate(data, self._manifest_schema)
# build new manifest from recognized keys
schema = self._manifest_schema["properties"]
manifest = {key: data[key] for key in data.keys() & schema.keys()}
except (KeyError, json.decoder.JSONDecodeError, ValidationError):
# raised for missing manifest, invalid json, missing/invalid keys
return {}
return manifest
def install(
self, file: Union[IO, str], manifest: dict = None
) -> Union[InstallOk, InstallError]:
"""Install add-on from path or file-like object. Metadata is read
from the manifest file, with keys overriden by supplying a 'manifest'
dictionary"""
try:
zfile = ZipFile(file)
except zipfile.BadZipfile:
return InstallError(errmsg="zip")
with zfile:
file_manifest = self.readManifestFile(zfile)
if manifest:
file_manifest.update(manifest)
manifest = file_manifest
if not manifest:
return InstallError(errmsg="manifest")
package = manifest["package"]
conflicts = manifest.get("conflicts", [])
found_conflicts = self._disableConflicting(package, conflicts)
meta = self.addonMeta(package)
self._install(package, zfile)
schema = self._manifest_schema["properties"]
manifest_meta = {
k: v for k, v in manifest.items() if k in schema and schema[k]["meta"]
}
meta.update(manifest_meta)
self.writeAddonMeta(package, meta)
return InstallOk(name=meta["name"], conflicts=found_conflicts)
def _install(self, dir, zfile):
# previously installed?
base = self.addonsFolder(dir)
if os.path.exists(base):
self.backupUserFiles(dir)
if not self.deleteAddon(dir):
self.restoreUserFiles(dir)
return
os.mkdir(base)
self.restoreUserFiles(dir)
# extract
for n in zfile.namelist():
if n.endswith("/"):
# folder; ignore
continue
path = os.path.join(base, n)
# skip existing user files
if os.path.exists(path) and n.startswith("user_files/"):
continue
zfile.extract(n, base)
# true on success
def deleteAddon(self, dir):
try:
send2trash(self.addonsFolder(dir))
return True
except OSError as e:
showWarning(
_(
"Unable to update or delete add-on. Please start Anki while holding down the shift key to temporarily disable add-ons, then try again.\n\nDebug info: %s"
)
% e,
textFormat="plain",
)
return False
# Processing local add-on files
######################################################################
def processPackages(
self, paths: List[str], parent: QWidget = None
) -> Tuple[List[str], List[str]]:
log = []
errs = []
self.mw.progress.start(immediate=True, parent=parent)
try:
for path in paths:
base = os.path.basename(path)
result = self.install(path)
if isinstance(result, InstallError):
errs.extend(
self._installationErrorReport(result, base, mode="local")
)
else:
log.extend(
self._installationSuccessReport(result, base, mode="local")
)
finally:
self.mw.progress.finish()
return log, errs
# Installation messaging
######################################################################
def _installationErrorReport(
self, result: InstallError, base: str, mode: str = "download"
) -> List[str]:
messages = {
"zip": _("Corrupt add-on file."),
"manifest": _("Invalid add-on manifest."),
}
msg = messages.get(result.errmsg, _("Unknown error: {}".format(result.errmsg)))
if mode == "download": # preserve old format strings for i18n
template = _("Error downloading <i>%(id)s</i>: %(error)s")
else:
template = _("Error installing <i>%(base)s</i>: %(error)s")
name = base
return [template % dict(base=name, id=name, error=msg)]
def _installationSuccessReport(
self, result: InstallOk, base: str, mode: str = "download"
) -> List[str]:
if mode == "download": # preserve old format strings for i18n
template = _("Downloaded %(fname)s")
else:
template = _("Installed %(name)s")
name = result.name or base
strings = [template % dict(name=name, fname=name)]
if result.conflicts:
strings.append(
_("The following conflicting add-ons were disabled:")
+ " "
+ ", ".join(self.addonName(f) for f in result.conflicts)
)
return strings
# Updating
######################################################################
def update_max_supported_versions(self, items: List[UpdateInfo]) -> None:
# todo
pass
def updates_required(self, items: List[UpdateInfo]) -> List[int]:
"""Return ids of add-ons requiring an update."""
need_update = []
for item in items:
if not self.addon_is_latest(item.id, item.last_updated):
need_update.append(item.id)
return need_update
def addon_is_latest(self, id: int, server_update: int) -> bool:
return self.addonMeta(str(id)).get("mod", 0) >= (server_update or 0)
# Add-on Config
######################################################################
_configButtonActions: Dict[str, Callable[[], Optional[bool]]] = {}
_configUpdatedActions: Dict[str, Callable[[Any], None]] = {}
def addonConfigDefaults(self, dir):
path = os.path.join(self.addonsFolder(dir), "config.json")
try:
with open(path, encoding="utf8") as f:
return json.load(f)
except:
return None
def addonConfigHelp(self, dir):
path = os.path.join(self.addonsFolder(dir), "config.md")
if os.path.exists(path):
with open(path, encoding="utf-8") as f:
return markdown.markdown(f.read())
else:
return ""
def addonFromModule(self, module):
return module.split(".")[0]
def configAction(self, addon):
return self._configButtonActions.get(addon)
def configUpdatedAction(self, addon):
return self._configUpdatedActions.get(addon)
# Add-on Config API
######################################################################
def getConfig(self, module: str) -> Optional[dict]:
addon = self.addonFromModule(module)
# get default config
config = self.addonConfigDefaults(addon)
if config is None:
return None
# merge in user's keys
meta = self.addonMeta(addon)
userConf = meta.get("config", {})
config.update(userConf)
return config
def setConfigAction(self, module: str, fn: Callable[[], Optional[bool]]):
addon = self.addonFromModule(module)
self._configButtonActions[addon] = fn
def setConfigUpdatedAction(self, module: str, fn: Callable[[Any], None]):
addon = self.addonFromModule(module)
self._configUpdatedActions[addon] = fn
def writeConfig(self, module: str, conf: dict):
addon = self.addonFromModule(module)
meta = self.addonMeta(addon)
meta["config"] = conf
self.writeAddonMeta(addon, meta)
# user_files
######################################################################
def _userFilesPath(self, sid):
return os.path.join(self.addonsFolder(sid), "user_files")
def _userFilesBackupPath(self):
return os.path.join(self.addonsFolder(), "files_backup")
def backupUserFiles(self, sid):
p = self._userFilesPath(sid)
if os.path.exists(p):
os.rename(p, self._userFilesBackupPath())
def restoreUserFiles(self, sid):
p = self._userFilesPath(sid)
bp = self._userFilesBackupPath()
# did we back up userFiles?
if not os.path.exists(bp):
return
os.rename(bp, p)
# Web Exports
######################################################################
_webExports: Dict[str, str] = {}
def setWebExports(self, module: str, pattern: str):
addon = self.addonFromModule(module)
self._webExports[addon] = pattern
def getWebExports(self, addon):
return self._webExports.get(addon)
# Add-ons Dialog
######################################################################
class AddonsDialog(QDialog):
def __init__(self, addonsManager: AddonManager):
self.mgr = addonsManager
self.mw = addonsManager.mw
super().__init__(self.mw)
f = self.form = aqt.forms.addons.Ui_Dialog()
f.setupUi(self)
f.getAddons.clicked.connect(self.onGetAddons)
f.installFromFile.clicked.connect(self.onInstallFiles)
f.checkForUpdates.clicked.connect(self.check_for_updates)
f.toggleEnabled.clicked.connect(self.onToggleEnabled)
f.viewPage.clicked.connect(self.onViewPage)
f.viewFiles.clicked.connect(self.onViewFiles)
f.delete_2.clicked.connect(self.onDelete)
f.config.clicked.connect(self.onConfig)
self.form.addonList.itemDoubleClicked.connect(self.onConfig)
self.form.addonList.currentRowChanged.connect(self._onAddonItemSelected)
self.setAcceptDrops(True)
self.redrawAddons()
restoreGeom(self, "addons")
self.show()
def dragEnterEvent(self, event):
mime = event.mimeData()
if not mime.hasUrls():
return None
urls = mime.urls()
ext = self.mgr.ext
if all(url.toLocalFile().endswith(ext) for url in urls):
event.acceptProposedAction()
def dropEvent(self, event):
mime = event.mimeData()
paths = []
for url in mime.urls():
path = url.toLocalFile()
if os.path.exists(path):
paths.append(path)
self.onInstallFiles(paths)
def reject(self):
saveGeom(self, "addons")
return QDialog.reject(self)
def redrawAddons(self):
addonList = self.form.addonList
mgr = self.mgr
self.addons = [(mgr.annotatedName(d), d) for d in mgr.allAddons()]
self.addons.sort()
selected = set(self.selectedAddons())
addonList.clear()
for name, dir in self.addons:
item = QListWidgetItem(name, addonList)
if not mgr.isEnabled(dir):
item.setForeground(Qt.gray)
if dir in selected:
item.setSelected(True)
addonList.reset()
def _onAddonItemSelected(self, row_int):
try:
addon = self.addons[row_int][1]
except IndexError:
addon = ""
self.form.viewPage.setEnabled(bool(re.match(r"^\d+$", addon)))
self.form.config.setEnabled(
bool(self.mgr.getConfig(addon) or self.mgr.configAction(addon))
)
def selectedAddons(self):
idxs = [x.row() for x in self.form.addonList.selectedIndexes()]
return [self.addons[idx][1] for idx in idxs]
def onlyOneSelected(self):
dirs = self.selectedAddons()
if len(dirs) != 1:
showInfo(_("Please select a single add-on first."))
return
return dirs[0]
def onToggleEnabled(self):
for dir in self.selectedAddons():
self.mgr.toggleEnabled(dir)
self.redrawAddons()
def onViewPage(self):
addon = self.onlyOneSelected()
if not addon:
return
if re.match(r"^\d+$", addon):
openLink(aqt.appShared + "info/{}".format(addon))
else:
showWarning(_("Add-on was not downloaded from AnkiWeb."))
def onViewFiles(self):
# if nothing selected, open top level folder
selected = self.selectedAddons()
if not selected:
openFolder(self.mgr.addonsFolder())
return
# otherwise require a single selection
addon = self.onlyOneSelected()
if not addon:
return
path = self.mgr.addonsFolder(addon)
openFolder(path)
def onDelete(self):
selected = self.selectedAddons()
if not selected:
return
if not askUser(
ngettext(
"Delete the %(num)d selected add-on?",
"Delete the %(num)d selected add-ons?",
len(selected),
)
% dict(num=len(selected))
):
return
for dir in selected:
if not self.mgr.deleteAddon(dir):
break
self.form.addonList.clearSelection()
self.redrawAddons()
def onGetAddons(self):
obj = GetAddons(self)
if obj.ids:
download_addons(self, self.mgr, obj.ids, self.after_downloading)
def after_downloading(self, log: List[DownloadLogEntry]):
if log:
self.redrawAddons()
show_log_to_user(self, log)
else:
tooltip(_("No updates available."))
def onInstallFiles(self, paths: Optional[List[str]] = None):
if not paths:
key = _("Packaged Anki Add-on") + " (*{})".format(self.mgr.ext)
paths = getFile(
self, _("Install Add-on(s)"), None, key, key="addons", multi=True
)
if not paths:
return False
installAddonPackages(self.mgr, paths, parent=self)
self.redrawAddons()
def check_for_updates(self):
tooltip(_("Checking..."))
check_and_prompt_for_updates(self, self.mgr, self.after_downloading)
def onConfig(self):
addon = self.onlyOneSelected()
if not addon:
return
# does add-on manage its own config?
act = self.mgr.configAction(addon)
if act:
ret = act()
if ret is not False:
return
conf = self.mgr.getConfig(addon)
if conf is None:
showInfo(_("Add-on has no configuration."))
return
ConfigEditor(self, addon, conf)
# Fetching Add-ons
######################################################################
class GetAddons(QDialog):
def __init__(self, dlg):
QDialog.__init__(self, dlg)
self.addonsDlg = dlg
self.mgr = dlg.mgr
self.mw = self.mgr.mw
self.ids: List[int] = []
self.form = aqt.forms.getaddons.Ui_Dialog()
self.form.setupUi(self)
b = self.form.buttonBox.addButton(
_("Browse Add-ons"), QDialogButtonBox.ActionRole
)
b.clicked.connect(self.onBrowse)
restoreGeom(self, "getaddons", adjustSize=True)
self.exec_()
saveGeom(self, "getaddons")
def onBrowse(self):
openLink(aqt.appShared + "addons/2.1")
def accept(self):
# get codes
try:
ids = [int(n) for n in self.form.code.text().split()]
except ValueError:
showWarning(_("Invalid code."))
return
self.ids = ids
QDialog.accept(self)
# Downloading
######################################################################
def download_addon(
client: AnkiRequestsClient, id: int
) -> Union[DownloadOk, DownloadError]:
"Fetch a single add-on from AnkiWeb."
try:
resp = client.get(aqt.appShared + f"download/{id}?v=2.1")
if resp.status_code != 200:
return DownloadError(status_code=resp.status_code)
data = client.streamContent(resp)
fname = re.match(
"attachment; filename=(.+)", resp.headers["content-disposition"]
).group(1)
return DownloadOk(data=data, filename=fname)
except Exception as e:
return DownloadError(exception=e)
def download_log_to_html(log: List[DownloadLogEntry]) -> str:
return "\n".join(map(describe_log_entry, log))
def describe_log_entry(id_and_entry: DownloadLogEntry) -> str:
(id, entry) = id_and_entry
buf = f"{id}: "
if isinstance(entry, DownloadError):
if entry.status_code is not None:
if entry.status_code in (403, 404):
buf += _(
"Invalid code, or add-on not available for your version of Anki."
)
else:
buf += _("Unexpected response code: %s") % entry.status_code
else:
buf += (
_("Please check your internet connection.")
+ "\n\n"
+ str(entry.exception)
)
elif isinstance(entry, InstallError):
buf += entry.errmsg
else:
buf += _("Installed successfully.")
return buf
def download_encountered_problem(log: List[DownloadLogEntry]) -> bool:
return any(not isinstance(e[1], InstallOk) for e in log)
def download_and_install_addon(
mgr: AddonManager, client: AnkiRequestsClient, id: int
) -> DownloadLogEntry:
"Download and install a single add-on."
result = download_addon(client, id)
if isinstance(result, DownloadError):
return (id, result)
fname = result.filename.replace("_", " ")
name = os.path.splitext(fname)[0]
result2 = mgr.install(
io.BytesIO(result.data),
manifest={"package": str(id), "name": name, "mod": intTime()},
)
return (id, result2)
class DownloaderInstaller(QObject):
progressSignal = pyqtSignal(int, int)
def __init__(
self, parent: QWidget, mgr: AddonManager, client: AnkiRequestsClient
) -> None:
QObject.__init__(self, parent)
self.mgr = mgr
self.client = client
self.progressSignal.connect(self._progress_callback) # type: ignore
def bg_thread_progress(up, down):
self.progressSignal.emit(up, down) # type: ignore
self.client.progress_hook = bg_thread_progress
def download(
self, ids: List[int], on_done: Callable[[List[DownloadLogEntry]], None]
) -> None:
self.ids = ids
self.log: List[DownloadLogEntry] = []
self.dl_bytes = 0
self.last_tooltip = 0
self.on_done = on_done
self.mgr.mw.progress.start(immediate=True, parent=self.parent())
self.mgr.mw.taskman.run(self._download_all, self._download_done)
def _progress_callback(self, up: int, down: int) -> None:
self.dl_bytes += down
self.mgr.mw.progress.update(
label=_("Downloading %(a)d/%(b)d (%(kb)0.2fKB)...")
% dict(a=len(self.log) + 1, b=len(self.ids), kb=self.dl_bytes / 1024)
)
def _download_all(self):
for id in self.ids:
self.log.append(download_and_install_addon(self.mgr, self.client, id))
def _download_done(self, future):
self.mgr.mw.progress.finish()
# qt gets confused if on_done() opens new windows while the progress
# modal is still cleaning up
self.mgr.mw.progress.timer(50, lambda: self.on_done(self.log), False)
def show_log_to_user(parent: QWidget, log: List[DownloadLogEntry]) -> None:
have_problem = download_encountered_problem(log)
if have_problem:
text = _("One or more errors occurred:")
else:
text = _("Download complete. Please restart Anki to apply changes.")
text += "<br><br>" + download_log_to_html(log)
if have_problem:
showWarning(text, textFormat="rich", parent=parent)
else:
showInfo(text, parent=parent)
def download_addons(
parent: QWidget,
mgr: AddonManager,
ids: List[int],
on_done: Callable[[List[DownloadLogEntry]], None],
client: Optional[AnkiRequestsClient] = None,
) -> None:
if client is None:
client = AnkiRequestsClient()
downloader = DownloaderInstaller(parent, mgr, client)
downloader.download(ids, on_done=on_done)
# Update checking
######################################################################
def fetch_update_info(client: AnkiRequestsClient, ids: List[int]) -> List[UpdateInfo]:
"""Fetch update info from AnkiWeb in one or more batches."""
all_info: List[UpdateInfo] = []
while ids:
# get another chunk
chunk = ids[:25]
del ids[:25]
batch_results = _fetch_update_info_batch(client, map(str, chunk))
all_info.extend(batch_results)
return all_info
def _fetch_update_info_batch(
client: AnkiRequestsClient, chunk: Iterable[str]
) -> Iterable[UpdateInfo]:
"""Get update info from AnkiWeb.
Chunk must not contain more than 25 ids."""
resp = client.get(aqt.appShared + "updates/" + ",".join(chunk) + "?v=2")
if resp.status_code == 200:
return json_update_info_to_native(resp.json())
else:
raise Exception(
"Unexpected response code from AnkiWeb: {}".format(resp.status_code)
)
def json_update_info_to_native(json_obj: List[Dict]) -> Iterable[UpdateInfo]:
def from_json(d: Dict[str, Any]) -> UpdateInfo:
return UpdateInfo(
id=d["id"], last_updated=d["updated"], max_point_version=d["maxver"]
)
return map(from_json, json_obj)
def check_and_prompt_for_updates(
parent: QWidget,
mgr: AddonManager,
on_done: Callable[[List[DownloadLogEntry]], None],
):
def on_updates_received(client: AnkiRequestsClient, items: List[UpdateInfo]):
handle_update_info(parent, mgr, client, items, on_done)
check_for_updates(mgr, on_updates_received)
def check_for_updates(
mgr: AddonManager, on_done: Callable[[AnkiRequestsClient, List[UpdateInfo]], None]
):
client = AnkiRequestsClient()
def check():
return fetch_update_info(client, mgr.enabled_addon_ids())
def update_info_received(future: Future):
# if syncing/in profile screen, defer message delivery
if not mgr.mw.col:
mgr.mw.progress.timer(
1000,
lambda: update_info_received(future),
False,
requiresCollection=False,
)
return
if future.exception():
# swallow network errors
print(str(future.exception()))
result = []
else:
result = future.result()
on_done(client, result)
mgr.mw.taskman.run(check, update_info_received)
def handle_update_info(
parent: QWidget,
mgr: AddonManager,
client: AnkiRequestsClient,
items: List[UpdateInfo],
on_done: Callable[[List[DownloadLogEntry]], None],
) -> None:
# record maximum supported versions
mgr.update_max_supported_versions(items)
updated_ids = mgr.updates_required(items)
if not updated_ids:
on_done([])
return
# tooltip(_("No updates available."))
prompt_to_update(parent, mgr, client, updated_ids, on_done)
def prompt_to_update(
parent: QWidget,
mgr: AddonManager,
client: AnkiRequestsClient,
ids: List[int],
on_done: Callable[[List[DownloadLogEntry]], None],
) -> None:
names = map(lambda x: mgr.addonName(str(x)), ids)
if not askUser(
_("The following add-ons have updates available. Install them now?")
+ "\n\n"
+ "\n".join(names)
):
# on_done is not called if the user cancels
return
download_addons(parent, mgr, ids, on_done, client)
# Editing config
######################################################################
class ConfigEditor(QDialog):
def __init__(self, dlg, addon, conf):
super().__init__(dlg)
self.addon = addon
self.conf = conf
self.mgr = dlg.mgr
self.form = aqt.forms.addonconf.Ui_Dialog()
self.form.setupUi(self)
restore = self.form.buttonBox.button(QDialogButtonBox.RestoreDefaults)
restore.clicked.connect(self.onRestoreDefaults)
self.setupFonts()
self.updateHelp()
self.updateText(self.conf)
restoreGeom(self, "addonconf")
restoreSplitter(self.form.splitter, "addonconf")
self.show()
def onRestoreDefaults(self):
default_conf = self.mgr.addonConfigDefaults(self.addon)
self.updateText(default_conf)
tooltip(_("Restored defaults"), parent=self)
def setupFonts(self):
font_mono = QFontDatabase.systemFont(QFontDatabase.FixedFont)
font_mono.setPointSize(font_mono.pointSize() + 1)
self.form.editor.setFont(font_mono)
def updateHelp(self):
txt = self.mgr.addonConfigHelp(self.addon)
if txt:
self.form.label.setText(txt)
else:
self.form.scrollArea.setVisible(False)
def updateText(self, conf):
self.form.editor.setPlainText(
json.dumps(
conf,
ensure_ascii=False,
sort_keys=True,
indent=4,
separators=(",", ": "),
)
)
def onClose(self):
saveGeom(self, "addonconf")
saveSplitter(self.form.splitter, "addonconf")
def reject(self):
self.onClose()
super().reject()
def accept(self):
txt = self.form.editor.toPlainText()
try:
new_conf = json.loads(txt)
except Exception as e:
showInfo(_("Invalid configuration: ") + repr(e))
return
if not isinstance(new_conf, dict):
showInfo(_("Invalid configuration: top level object must be a map"))
return
if new_conf != self.conf:
self.mgr.writeConfig(self.addon, new_conf)
# does the add-on define an action to be fired?
act = self.mgr.configUpdatedAction(self.addon)
if act:
act(new_conf)
self.onClose()
super().accept()
# .ankiaddon installation wizard
######################################################################
def installAddonPackages(
addonsManager: AddonManager,
paths: List[str],
parent: Optional[QWidget] = None,
warn: bool = False,
strictly_modal: bool = False,
advise_restart: bool = False,
) -> bool:
if warn:
names = ",<br>".join(f"<b>{os.path.basename(p)}</b>" for p in paths)
q = _(
"<b>Important</b>: As add-ons are programs downloaded from the internet, "
"they are potentially malicious."
"<b>You should only install add-ons you trust.</b><br><br>"
"Are you sure you want to proceed with the installation of the "
"following Anki add-on(s)?<br><br>%(names)s"
) % dict(names=names)
if (
not showInfo(
q,
parent=parent,
title=_("Install Anki add-on"),
type="warning",
customBtns=[QMessageBox.No, QMessageBox.Yes],
)
== QMessageBox.Yes
):
return False
log, errs = addonsManager.processPackages(paths, parent=parent)
if log:
log_html = "<br>".join(log)
if advise_restart:
log_html += "<br><br>" + _(
"<b>Please restart Anki to complete the installation.</b>"
)
if len(log) == 1 and not strictly_modal:
tooltip(log_html, parent=parent)
else:
showInfo(
log_html,
parent=parent,
textFormat="rich",
title=_("Installation complete"),
)
if errs:
msg = _("Please report this to the respective add-on author(s).")
showWarning(
"<br><br>".join(errs + [msg]),
parent=parent,
textFormat="rich",
title=_("Add-on installation error"),
)
return not errs