new render_all_latex() + gui button; remove some old code

This commit is contained in:
Damien Elmes 2020-02-11 15:09:33 +10:00
parent 7f365faf3f
commit 9df2a08cb0
5 changed files with 71 additions and 298 deletions

View File

@ -15,7 +15,7 @@ from anki.lang import _
from anki.models import NoteType
from anki.rsbackend import ExtractedLatex
from anki.template import TemplateRenderContext, TemplateRenderOutput
from anki.utils import call, checksum, isMac, namedtmp, stripHTML, tmpdir
from anki.utils import call, isMac, namedtmp, tmpdir
pngCommands = [
["latex", "-interaction=nonstopmode", "tmp.tex"],
@ -66,7 +66,14 @@ def render_latex(html: str, model: NoteType, col: anki.storage._Collection,) ->
return html
def _save_latex_image(col: anki.storage._Collection, extracted: ExtractedLatex, header: str, footer: str, svg: bool) -> Optional[str]:
def _save_latex_image(
col: anki.storage._Collection,
extracted: ExtractedLatex,
header: str,
footer: str,
svg: bool,
) -> Optional[str]:
# add header/footer
latex = header + "\n" + extracted.latex_body + "\n" + footer
# it's only really secure if run in a jail, but these are the most common

View File

@ -6,7 +6,6 @@ from __future__ import annotations
import os
import re
import sys
import unicodedata
import urllib.error
import urllib.parse
import urllib.request
@ -14,11 +13,9 @@ from typing import Any, Callable, List, Optional, Tuple, Union
import anki
from anki.consts import *
from anki.db import DB, DBError
from anki.lang import _
from anki.latex import render_latex
from anki.rsbackend import MediaCheckOutput
from anki.utils import checksum, isMac
from anki.utils import intTime
def media_paths_from_col_path(col_path: str) -> Tuple[str, str]:
@ -27,6 +24,9 @@ def media_paths_from_col_path(col_path: str) -> Tuple[str, str]:
return (media_folder, media_db)
# fixme: look into whether we can drop chdir() below
# - need to check aa89d06304fecd3597da4565330a3e55bdbb91fe
# - and audio handling code
class MediaManager:
soundRegexps = [r"(?i)(\[sound:(?P<fname>[^]]+)\])"]
@ -37,7 +37,6 @@ class MediaManager:
r"(?i)(<img[^>]* src=(?!['\"])(?P<fname>[^ >]+)[^>]*?>)",
]
regexps = soundRegexps + imgRegexps
db: Optional[DB]
def __init__(self, col: anki.storage._Collection, server: bool) -> None:
self.col = col
@ -57,40 +56,15 @@ class MediaManager:
os.chdir(self._dir)
except OSError:
raise Exception("invalidTempFolder")
# change database
self.connect()
def connect(self) -> None:
if self.col.server:
return
path = media_paths_from_col_path(self.col.path)[1]
create = not os.path.exists(path)
os.chdir(self._dir)
self.db = DB(path)
if create:
self._initDB()
def _initDB(self) -> None:
self.db.executescript(
"""
create table media (
fname text not null primary key,
csum text, -- null indicates deleted file
mtime int not null, -- zero if deleted
dirty int not null
);
create index idx_media_dirty on media (dirty);
create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
"""
)
def close(self) -> None:
if self.col.server:
return
self.db.close()
self.db = None
# change cwd back to old location
if self._oldcwd:
try:
@ -99,16 +73,10 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
# may have been deleted
pass
def _deleteDB(self) -> None:
path = self.db._path
self.close()
os.unlink(path)
self.connect()
def dir(self) -> Any:
return self._dir
# Adding media
# File manipulation
##########################################################################
def add_file(self, path: str) -> str:
@ -137,15 +105,8 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
fname += type_map[content_type]
return fname
# legacy
addFile = add_file
# legacy
def writeData(self, opath: str, data: bytes, typeHint: Optional[str] = None) -> str:
fname = os.path.basename(opath)
if typeHint:
fname = self.add_extension_based_on_mime(fname, typeHint)
return self.write_data(fname, data)
def have(self, fname: str) -> bool:
return os.path.exists(os.path.join(self.dir(), fname))
# String manipulation
##########################################################################
@ -172,11 +133,13 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return txt
def strip(self, txt: str) -> str:
"Return text with sound and image tags removed."
for reg in self.regexps:
txt = re.sub(reg, "", txt)
return txt
def escapeImages(self, string: str, unescape: bool = False) -> str:
"Apply or remove percent encoding to image filenames."
fn: Callable
if unescape:
fn = urllib.parse.unquote
@ -201,99 +164,30 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
"This should be called while the collection is closed."
return self.col.backend.check_media()
def check_old(
self, local: Optional[List[str]] = None
) -> Tuple[List[str], List[str], List[str]]:
"Return (missingFiles, unusedFiles)."
mdir = self.dir()
# gather all media references in NFC form
allRefs = set()
for nid, mid, flds in self.col.db.execute("select id, mid, flds from notes"):
noteRefs = self.filesInStr(mid, flds)
# check the refs are in NFC
for f in noteRefs:
# if they're not, we'll need to fix them first
if f != unicodedata.normalize("NFC", f):
self._normalizeNoteRefs(nid)
noteRefs = self.filesInStr(mid, flds)
break
allRefs.update(noteRefs)
# loop through media folder
unused = []
if local is None:
files = os.listdir(mdir)
else:
files = local
renamedFiles = False
dirFound = False
warnings = []
for file in files:
if not local:
if not os.path.isfile(file):
# ignore directories
dirFound = True
continue
if file.startswith("_"):
# leading _ says to ignore file
def render_all_latex(self, progress_cb: Optional[Callable[[int], bool]] = None):
"""Render any LaTeX that is missing.
If a progress callback is provided and it returns false, the operation
will be aborted.
"""
last_progress = intTime()
for c, (nid, mid, flds) in enumerate(
self.col.db.execute("select id, mid, flds from notes")
):
if "[" not in flds:
continue
if self.hasIllegal(file):
name = file.encode(sys.getfilesystemencoding(), errors="replace")
name = str(name, sys.getfilesystemencoding())
warnings.append(_("Invalid file name, please rename: %s") % name)
continue
model = self.col.models.get(mid)
render_latex(flds, model, self.col)
nfcFile = unicodedata.normalize("NFC", file)
# we enforce NFC fs encoding on non-macs
if not isMac and not local:
if file != nfcFile:
# delete if we already have the NFC form, otherwise rename
if os.path.exists(nfcFile):
os.unlink(file)
renamedFiles = True
else:
os.rename(file, nfcFile)
renamedFiles = True
file = nfcFile
# compare
if nfcFile not in allRefs:
unused.append(file)
else:
allRefs.discard(nfcFile)
# if we renamed any files to nfc format, we must rerun the check
# to make sure the renamed files are not marked as unused
if renamedFiles:
return self.check_old(local=local)
nohave = [x for x in allRefs if not x.startswith("_")]
# make sure the media DB is valid
try:
self.findChanges()
except DBError:
self._deleteDB()
if c % 10 == 0:
elap = last_progress - intTime()
if elap >= 1 and progress_cb is not None:
last_progress = intTime()
if not progress_cb(c + 1):
return
if dirFound:
warnings.append(
_(
"Anki does not support files in subfolders of the collection.media folder."
)
)
return (nohave, unused, warnings)
def _normalizeNoteRefs(self, nid) -> None:
note = self.col.getNote(nid)
for c, fld in enumerate(note.fields):
nfc = unicodedata.normalize("NFC", fld)
if nfc != fld:
note.fields[c] = nfc
note.flush()
# Copying on import
##########################################################################
def have(self, fname: str) -> bool:
return os.path.exists(os.path.join(self.dir(), fname))
# Illegal characters and paths
# Legacy
##########################################################################
_illegalCharReg = re.compile(r'[][><:"/?*^\\|\0\r\n]')
@ -304,6 +198,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return re.sub(self._illegalCharReg, "", str)
def hasIllegal(self, s: str) -> bool:
print("hasIllegal() will go away")
if re.search(self._illegalCharReg, s):
return True
try:
@ -312,101 +207,13 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return True
return False
# Tracking changes
##########################################################################
def findChanges(self) -> None:
"Scan the media folder if it's changed, and note any changes."
if self._changed():
self._logChanges()
pass
def haveDirty(self) -> Any:
return self.db.scalar("select 1 from media where dirty=1 limit 1")
addFile = add_file
def _mtime(self, path: str) -> int:
return int(os.stat(path).st_mtime)
def _checksum(self, path: str) -> str:
with open(path, "rb") as f:
return checksum(f.read())
def _changed(self) -> int:
"Return dir mtime if it has changed since the last findChanges()"
# doesn't track edits, but user can add or remove a file to update
mod = self.db.scalar("select dirMod from meta")
mtime = self._mtime(self.dir())
if mod and mod == mtime:
return False
return mtime
def _logChanges(self) -> None:
(added, removed) = self._changes()
media = []
for f, mtime in added:
media.append((f, self._checksum(f), mtime, 1))
for f in removed:
media.append((f, None, 0, 1))
# update media db
self.db.executemany("insert or replace into media values (?,?,?,?)", media)
self.db.execute("update meta set dirMod = ?", self._mtime(self.dir()))
self.db.commit()
def _changes(self) -> Tuple[List[Tuple[str, int]], List[str]]:
self.cache: Dict[str, Any] = {}
for (name, csum, mod) in self.db.execute(
"select fname, csum, mtime from media where csum is not null"
):
# previous entries may not have been in NFC form
normname = unicodedata.normalize("NFC", name)
self.cache[normname] = [csum, mod, False]
added = []
removed = []
# loop through on-disk files
with os.scandir(self.dir()) as it:
for f in it:
# ignore folders and thumbs.db
if f.is_dir():
continue
if f.name.lower() == "thumbs.db":
continue
# and files with invalid chars
if self.hasIllegal(f.name):
continue
# empty files are invalid; clean them up and continue
sz = f.stat().st_size
if not sz:
os.unlink(f.name)
continue
if sz > 100 * 1024 * 1024:
self.col.log("ignoring file over 100MB", f.name)
continue
# check encoding
normname = unicodedata.normalize("NFC", f.name)
if not isMac:
if f.name != normname:
# wrong filename encoding which will cause sync errors
if os.path.exists(normname):
os.unlink(f.name)
else:
os.rename(f.name, normname)
else:
# on Macs we can access the file using any normalization
pass
# newly added?
mtime = int(f.stat().st_mtime)
if normname not in self.cache:
added.append((normname, mtime))
else:
# modified since last time?
if mtime != self.cache[normname][1]:
# and has different checksum?
if self._checksum(normname) != self.cache[normname][0]:
added.append((normname, mtime))
# mark as used
self.cache[normname][2] = True
# look for any entries in the cache that no longer exist on disk
for (k, v) in list(self.cache.items()):
if not v[2]:
removed.append(k)
return added, removed
def writeData(self, opath: str, data: bytes, typeHint: Optional[str] = None) -> str:
fname = os.path.basename(opath)
if typeHint:
fname = self.add_extension_based_on_mime(fname, typeHint)
return self.write_data(fname, data)

View File

@ -3,15 +3,11 @@
import os
import shutil
from anki.utils import stripHTML
from tests.shared import getEmptyCol
def test_latex():
print("** aborting test_latex for now")
return
d = getEmptyCol() # pylint: disable=unreachable
d = getEmptyCol()
# change latex cmd to simulate broken build
import anki.latex
@ -33,7 +29,7 @@ def test_latex():
# fix path
anki.latex.pngCommands[0][0] = "latex"
# check media db should cause latex to be generated
d.media.check()
d.media.render_all_latex()
assert len(os.listdir(d.media.dir())) == 1
assert ".png" in f.cards()[0].q()
# adding new notes should cause generation on question display
@ -50,13 +46,12 @@ def test_latex():
oldcard = f.cards()[0]
assert ".png" in oldcard.q()
# if we turn off building, then previous cards should work, but cards with
# missing media will show the latex
# missing media will show a broken image
anki.latex.build = False
f = d.newNote()
f["Front"] = "[latex]foo[/latex]"
d.addNote(f)
assert len(os.listdir(d.media.dir())) == 2
assert stripHTML(f.cards()[0].q()) == "[latex]foo[/latex]"
assert ".png" in oldcard.q()
# turn it on again so other test don't suffer
anki.latex.build = True

View File

@ -78,61 +78,3 @@ def test_deckIntegration():
d.reopen()
assert ret.missing == ["fake2.png"]
assert ret.unused == ["foo.jpg"]
def test_changes():
d = getEmptyCol()
def added():
return d.media.db.execute("select fname from media where csum is not null")
def removed():
return d.media.db.execute("select fname from media where csum is null")
def advanceTime():
d.media.db.execute("update media set mtime=mtime-1")
d.media.db.execute("update meta set dirMod = dirMod - 1")
assert not list(added())
assert not list(removed())
# add a file
dir = tempfile.mkdtemp(prefix="anki")
path = os.path.join(dir, "foo.jpg")
with open(path, "w") as f:
f.write("hello")
path = d.media.addFile(path)
# should have been logged
d.media.findChanges()
assert list(added())
assert not list(removed())
# if we modify it, the cache won't notice
advanceTime()
with open(path, "w") as f:
f.write("world")
assert len(list(added())) == 1
assert not list(removed())
# but if we add another file, it will
advanceTime()
with open(path + "2", "w") as f:
f.write("yo")
d.media.findChanges()
assert len(list(added())) == 2
assert not list(removed())
# deletions should get noticed too
advanceTime()
os.unlink(path + "2")
d.media.findChanges()
assert len(list(added())) == 1
assert len(list(removed())) == 1
def test_illegal():
d = getEmptyCol()
aString = "a:b|cd\\e/f\0g*h"
good = "abcdefgh"
for c in aString:
bad = d.media.hasIllegal("somestring" + c + "morestring")
if bad:
assert c not in good
else:
assert c in good

View File

@ -80,6 +80,13 @@ class MediaChecker:
box.addButton(b, QDialogButtonBox.ActionRole)
b.clicked.connect(lambda c, u=output.unused, d=diag: deleteUnused(self.mw, u, d)) # type: ignore
if output.missing:
if any(map(lambda x: x.startswith("latex-"), output.missing)):
b = QPushButton(_("Render LaTeX"))
b.setAutoDefault(False)
box.addButton(b, QDialogButtonBox.RejectRole)
b.clicked.connect(self._on_render_latex) # type: ignore
box.rejected.connect(diag.reject) # type: ignore
diag.setMinimumHeight(400)
diag.setMinimumWidth(500)
@ -87,6 +94,21 @@ class MediaChecker:
diag.exec_()
saveGeom(diag, "checkmediadb")
def _on_render_latex(self):
self.progress_dialog = self.mw.progress.start()
try:
self.mw.col.media.render_all_latex(self._on_render_latex_progress)
finally:
self.mw.progress.finish()
tooltip(_("LaTeX rendered."))
def _on_render_latex_progress(self, count: int) -> bool:
if self.progress_dialog.wantCancel:
return False
self.mw.progress.update(_("Checked {}...").format(count))
return True
def describe_output(output: MediaCheckOutput) -> str:
buf = []