automatically fix 1.2 errors when upgrading

This commit is contained in:
Damien Elmes 2013-01-08 09:04:35 +09:00
parent 0447cb4f41
commit 289f0a6452
36 changed files with 9839 additions and 39 deletions

2
README
View File

@ -7,7 +7,7 @@ Prerequisites
To install the prerequisites on Ubuntu/Debian, please use the following To install the prerequisites on Ubuntu/Debian, please use the following
command: command:
sudo apt-get install python-qt4 mplayer lame libportaudio2 sudo apt-get install python-qt4 mplayer lame libportaudio2 python-sqlalchemy
If you're on another distribution the packages may be named differently, so If you're on another distribution the packages may be named differently, so
please consult your package manager. please consult your package manager.

View File

@ -12,14 +12,17 @@ class Anki1Importer(Anki2Importer):
def run(self): def run(self):
u = Upgrader() u = Upgrader()
# check # check
if not u.check(self.file): res = u.check(self.file)
if res == "invalid":
self.log.append(_( self.log.append(_(
"File is old or damaged; please run Tools>Advanced>Check DB " "File is invalid. Please restore from backup."))
"in Anki 1.2 first."))
raise Exception("invalidFile") raise Exception("invalidFile")
# upgrade # upgrade
if res != "ok":
self.log.append(
"Problems fixed during upgrade:\n***\n%s\n***\n" % res)
try: try:
deck = u.upgrade(self.file) deck = u.upgrade()
except: except:
traceback.print_exc() traceback.print_exc()
self.log.append(traceback.format_exc()) self.log.append(traceback.format_exc())

View File

@ -18,34 +18,35 @@ from anki.storage import _addSchema, _getColVars, _addColVars, \
class Upgrader(object): class Upgrader(object):
def __init__(self): def __init__(self):
pass self.tmppath = None
# Upgrading # Integrity checking & initial setup
######################################################################
def upgrade(self, path):
self.path = path
self._openDB(path)
self._upgradeSchema()
self._openCol()
self._upgradeRest()
return self.col
# Integrity checking
###################################################################### ######################################################################
def check(self, path): def check(self, path):
"True if deck looks ok." "Returns 'ok', 'invalid', or log of fixes applied."
with DB(path) as db: # copy into a temp file before we open
return self._check(db) self.tmppath = tmpfile(suffix=".anki2")
shutil.copy(path, self.tmppath)
# run initial check
with DB(self.tmppath) as db:
res = self._check(db)
# needs fixing?
if res not in ("ok", "invalid"):
res = self._fix(self.tmppath)
# don't allow .upgrade() if invalid
if res == "invalid":
os.unlink(self.tmppath)
self.tmppath = None
return res
def _check(self, db): def _check(self, db):
# corrupt? # corrupt?
try: try:
if db.scalar("pragma integrity_check") != "ok": if db.scalar("pragma integrity_check") != "ok":
return return "invalid"
except: except:
return return "invalid"
# old version? # old version?
if db.scalar("select version from decks") < 65: if db.scalar("select version from decks") < 65:
return return
@ -98,18 +99,35 @@ f.id = cards.factId)"""):
select id from cards where type != (case select id from cards where type != (case
when type >= 0 then relativeDelay else relativeDelay - 3 end)"""): when type >= 0 then relativeDelay else relativeDelay - 3 end)"""):
return return
return True return "ok"
# DB/Deck opening def _fix(self, path):
from oldanki import DeckStorage
try:
deck = DeckStorage.Deck(path, backup=False)
except:
# if we can't open the file, it's invalid
return "invalid"
# run a db check
res = deck.fixIntegrity()
if "Database file is damaged" in res:
# we can't recover from a corrupt db
return "invalid"
# other errors are non-fatal
deck.close()
return res
# Upgrading
###################################################################### ######################################################################
def _openDB(self, path): def upgrade(self):
self.tmppath = tmpfile(suffix=".anki2") assert self.tmppath
shutil.copy(path, self.tmppath)
self.db = DB(self.tmppath) self.db = DB(self.tmppath)
self._upgradeSchema()
def _openCol(self):
self.col = _Collection(self.db) self.col = _Collection(self.db)
self._upgradeRest()
self.tmppath = None
return self.col
# Schema upgrade # Schema upgrade
###################################################################### ######################################################################

View File

@ -297,9 +297,7 @@ backup, please see the 'Backups' section of the user manual."""))
except Exception, e: except Exception, e:
if "invalidFile" in unicode(e): if "invalidFile" in unicode(e):
msg = _("""\ msg = _("""\
Invalid file. Please run a DB check in Anki 1.2 and try again.""") Invalid file. Please restore from backup.""")
msg += _(""" \
Even if the DB check reports 'no problems found', a subsequent import should work.""")
showWarning(msg) showWarning(msg)
elif "readonly" in unicode(e): elif "readonly" in unicode(e):
showWarning(_("""\ showWarning(_("""\

2
oldanki/README Normal file
View File

@ -0,0 +1,2 @@
This is libanki 1.2.11, for the purposes of fixing problems when upgrading
1.2.x decks.

58
oldanki/__init__.py Normal file
View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Anki (libanki)
====================
Open a deck:
deck = oldanki.DeckStorage.Deck(path)
Get a card:
card = deck.getCard()
if not card:
# deck is finished
Show the card:
print card.question, card.answer
Answer the card:
deck.answerCard(card, ease)
Edit the card:
fields = card.fact.model.fieldModels
for field in fields:
card.fact[field.name] = "newvalue"
card.fact.setModified(textChanged=True, deck=deck)
deck.setModified()
Get all cards via ORM (slow):
from oldanki.cards import Card
cards = deck.s.query(Card).all()
Get all q/a/ids via SQL (fast):
cards = deck.s.all("select id, question, answer from cards")
Save & close:
deck.save()
deck.close()
"""
__docformat__ = 'restructuredtext'
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
pass
version = "1.2.11"
from oldanki.deck import DeckStorage

311
oldanki/cards.py Normal file
View File

@ -0,0 +1,311 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Cards
====================
"""
__docformat__ = 'restructuredtext'
import time, sys, math, random
from oldanki.db import *
from oldanki.models import CardModel, Model, FieldModel, formatQA
from oldanki.facts import Fact, factsTable, Field
from oldanki.utils import parseTags, findTag, stripHTML, genID, hexifyID
from oldanki.media import updateMediaCount, mediaFiles
MAX_TIMER = 60
# Cards
##########################################################################
cardsTable = Table(
'cards', metadata,
Column('id', Integer, primary_key=True),
Column('factId', Integer, ForeignKey("facts.id"), nullable=False),
Column('cardModelId', Integer, ForeignKey("cardModels.id"), nullable=False),
Column('created', Float, nullable=False, default=time.time),
Column('modified', Float, nullable=False, default=time.time),
Column('tags', UnicodeText, nullable=False, default=u""),
Column('ordinal', Integer, nullable=False),
# cached - changed on fact update
Column('question', UnicodeText, nullable=False, default=u""),
Column('answer', UnicodeText, nullable=False, default=u""),
# default to 'normal' priority;
# this is indexed in deck.py as we need to create a reverse index
Column('priority', Integer, nullable=False, default=2),
Column('interval', Float, nullable=False, default=0),
Column('lastInterval', Float, nullable=False, default=0),
Column('due', Float, nullable=False, default=time.time),
Column('lastDue', Float, nullable=False, default=0),
Column('factor', Float, nullable=False, default=2.5),
Column('lastFactor', Float, nullable=False, default=2.5),
Column('firstAnswered', Float, nullable=False, default=0),
# stats
Column('reps', Integer, nullable=False, default=0),
Column('successive', Integer, nullable=False, default=0),
Column('averageTime', Float, nullable=False, default=0),
Column('reviewTime', Float, nullable=False, default=0),
Column('youngEase0', Integer, nullable=False, default=0),
Column('youngEase1', Integer, nullable=False, default=0),
Column('youngEase2', Integer, nullable=False, default=0),
Column('youngEase3', Integer, nullable=False, default=0),
Column('youngEase4', Integer, nullable=False, default=0),
Column('matureEase0', Integer, nullable=False, default=0),
Column('matureEase1', Integer, nullable=False, default=0),
Column('matureEase2', Integer, nullable=False, default=0),
Column('matureEase3', Integer, nullable=False, default=0),
Column('matureEase4', Integer, nullable=False, default=0),
# this duplicates the above data, because there's no way to map imported
# data to the above
Column('yesCount', Integer, nullable=False, default=0),
Column('noCount', Integer, nullable=False, default=0),
# obsolete
Column('spaceUntil', Float, nullable=False, default=0),
# relativeDelay is reused as type without scheduling (ie, it remains 0-2
# even if card is suspended, etc)
Column('relativeDelay', Float, nullable=False, default=0),
Column('isDue', Boolean, nullable=False, default=0), # obsolete
Column('type', Integer, nullable=False, default=2),
Column('combinedDue', Integer, nullable=False, default=0))
class Card(object):
"A card."
def __init__(self, fact=None, cardModel=None, created=None):
self.tags = u""
self.id = genID()
# new cards start as new & due
self.type = 2
self.relativeDelay = self.type
self.timerStarted = False
self.timerStopped = False
self.modified = time.time()
if created:
self.created = created
self.due = created
else:
self.due = self.modified
self.combinedDue = self.due
if fact:
self.fact = fact
if cardModel:
self.cardModel = cardModel
# for non-orm use
self.cardModelId = cardModel.id
self.ordinal = cardModel.ordinal
def rebuildQA(self, deck, media=True):
# format qa
d = {}
for f in self.fact.model.fieldModels:
d[f.name] = (f.id, self.fact[f.name])
qa = formatQA(None, self.fact.modelId, d, self.splitTags(),
self.cardModel, deck)
# find old media references
files = {}
for type in ("question", "answer"):
for f in mediaFiles(getattr(self, type) or ""):
if f in files:
files[f] -= 1
else:
files[f] = -1
# update q/a
self.question = qa['question']
self.answer = qa['answer']
# determine media delta
for type in ("question", "answer"):
for f in mediaFiles(getattr(self, type)):
if f in files:
files[f] += 1
else:
files[f] = 1
# update media counts if we're attached to deck
if media:
for (f, cnt) in files.items():
updateMediaCount(deck, f, cnt)
self.setModified()
def setModified(self):
self.modified = time.time()
def startTimer(self):
self.timerStarted = time.time()
def stopTimer(self):
self.timerStopped = time.time()
def thinkingTime(self):
return (self.timerStopped or time.time()) - self.timerStarted
def totalTime(self):
return time.time() - self.timerStarted
def genFuzz(self):
"Generate a random offset to spread intervals."
self.fuzz = random.uniform(0.95, 1.05)
def htmlQuestion(self, type="question", align=True):
div = '''<div class="card%s" id="cm%s%s">%s</div>''' % (
type[0], type[0], hexifyID(self.cardModelId),
getattr(self, type))
# add outer div & alignment (with tables due to qt's html handling)
if not align:
return div
attr = type + 'Align'
if getattr(self.cardModel, attr) == 0:
align = "center"
elif getattr(self.cardModel, attr) == 1:
align = "left"
else:
align = "right"
return (("<center><table width=95%%><tr><td align=%s>" % align) +
div + "</td></tr></table></center>")
def htmlAnswer(self, align=True):
return self.htmlQuestion(type="answer", align=align)
def updateStats(self, ease, state):
self.reps += 1
if ease > 1:
self.successive += 1
else:
self.successive = 0
delay = min(self.totalTime(), MAX_TIMER)
self.reviewTime += delay
if self.averageTime:
self.averageTime = (self.averageTime + delay) / 2.0
else:
self.averageTime = delay
# we don't track first answer for cards
if state == "new":
state = "young"
# update ease and yes/no count
attr = state + "Ease%d" % ease
setattr(self, attr, getattr(self, attr) + 1)
if ease < 2:
self.noCount += 1
else:
self.yesCount += 1
if not self.firstAnswered:
self.firstAnswered = time.time()
self.setModified()
def splitTags(self):
return (self.fact.tags, self.fact.model.tags, self.cardModel.name)
def allTags(self):
"Non-canonified string of all tags."
return (self.fact.tags + "," +
self.fact.model.tags)
def hasTag(self, tag):
return findTag(tag, parseTags(self.allTags()))
def fromDB(self, s, id):
r = s.first("""select
id, factId, cardModelId, created, modified, tags, ordinal, question, answer,
priority, interval, lastInterval, due, lastDue, factor,
lastFactor, firstAnswered, reps, successive, averageTime, reviewTime,
youngEase0, youngEase1, youngEase2, youngEase3, youngEase4,
matureEase0, matureEase1, matureEase2, matureEase3, matureEase4,
yesCount, noCount, spaceUntil, isDue, type, combinedDue
from cards where id = :id""", id=id)
if not r:
return
(self.id,
self.factId,
self.cardModelId,
self.created,
self.modified,
self.tags,
self.ordinal,
self.question,
self.answer,
self.priority,
self.interval,
self.lastInterval,
self.due,
self.lastDue,
self.factor,
self.lastFactor,
self.firstAnswered,
self.reps,
self.successive,
self.averageTime,
self.reviewTime,
self.youngEase0,
self.youngEase1,
self.youngEase2,
self.youngEase3,
self.youngEase4,
self.matureEase0,
self.matureEase1,
self.matureEase2,
self.matureEase3,
self.matureEase4,
self.yesCount,
self.noCount,
self.spaceUntil,
self.isDue,
self.type,
self.combinedDue) = r
return True
def toDB(self, s):
"Write card to DB."
s.execute("""update cards set
modified=:modified,
tags=:tags,
interval=:interval,
lastInterval=:lastInterval,
due=:due,
lastDue=:lastDue,
factor=:factor,
lastFactor=:lastFactor,
firstAnswered=:firstAnswered,
reps=:reps,
successive=:successive,
averageTime=:averageTime,
reviewTime=:reviewTime,
youngEase0=:youngEase0,
youngEase1=:youngEase1,
youngEase2=:youngEase2,
youngEase3=:youngEase3,
youngEase4=:youngEase4,
matureEase0=:matureEase0,
matureEase1=:matureEase1,
matureEase2=:matureEase2,
matureEase3=:matureEase3,
matureEase4=:matureEase4,
yesCount=:yesCount,
noCount=:noCount,
spaceUntil = :spaceUntil,
isDue = 0,
type = :type,
combinedDue = :combinedDue,
relativeDelay = :relativeDelay,
priority = :priority
where id=:id""", self.__dict__)
mapper(Card, cardsTable, properties={
'cardModel': relation(CardModel),
'fact': relation(Fact, backref="cards", primaryjoin=
cardsTable.c.factId == factsTable.c.id),
})
mapper(Fact, factsTable, properties={
'model': relation(Model),
'fields': relation(Field, backref="fact", order_by=Field.ordinal),
})
# Card deletions
##########################################################################
cardsDeletedTable = Table(
'cardsDeleted', metadata,
Column('cardId', Integer, ForeignKey("cards.id"),
nullable=False),
Column('deletedTime', Float, nullable=False))

149
oldanki/db.py Normal file
View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
DB tools
====================
SessionHelper is a wrapper for the standard sqlalchemy session, which provides
some convenience routines, and manages transactions itself.
object_session() is a replacement for the standard object_session(), which
provides the features of SessionHelper, and avoids taking out another
transaction.
"""
__docformat__ = 'restructuredtext'
try:
from pysqlite2 import dbapi2 as sqlite
except ImportError:
try:
from sqlite3 import dbapi2 as sqlite
except:
raise Exception("Please install pysqlite2 or python2.5")
from sqlalchemy import (Table, Integer, Float, Column, MetaData,
ForeignKey, Boolean, String, Date,
UniqueConstraint, Index, PrimaryKeyConstraint)
from sqlalchemy import create_engine
from sqlalchemy.orm import mapper, sessionmaker as _sessionmaker, relation, backref, \
object_session as _object_session, class_mapper
from sqlalchemy.sql import select, text, and_
from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.pool import NullPool
import sqlalchemy
# some users are still on 0.4.x..
import warnings
warnings.filterwarnings('ignore', 'Use session.add()')
warnings.filterwarnings('ignore', 'Use session.expunge_all()')
# sqlalchemy didn't handle the move to unicodetext nicely
try:
from sqlalchemy import UnicodeText
except ImportError:
from sqlalchemy import Unicode
UnicodeText = Unicode
from oldanki.hooks import runHook
# shared metadata
metadata = MetaData()
# this class assumes the provided session is called with transactional=False
class SessionHelper(object):
"Add some convenience routines to a session."
def __init__(self, session, lock=False, transaction=True):
self._session = session
self._lock = lock
self._transaction = transaction
if self._transaction:
self._session.begin()
if self._lock:
self._lockDB()
self._seen = True
def save(self, obj):
# compat
if sqlalchemy.__version__.startswith("0.4."):
self._session.save(obj)
else:
self._session.add(obj)
def clear(self):
# compat
if sqlalchemy.__version__.startswith("0.4."):
self._session.clear()
else:
self._session.expunge_all()
def update(self, obj):
# compat
if sqlalchemy.__version__.startswith("0.4."):
self._session.update(obj)
else:
self._session.add(obj)
def execute(self, *a, **ka):
x = self._session.execute(*a, **ka)
runHook("dbFinished")
return x
def __getattr__(self, k):
return getattr(self.__dict__['_session'], k)
def scalar(self, sql, **args):
return self.execute(text(sql), args).scalar()
def all(self, sql, **args):
return self.execute(text(sql), args).fetchall()
def first(self, sql, **args):
c = self.execute(text(sql), args)
r = c.fetchone()
c.close()
return r
def column0(self, sql, **args):
return [x[0] for x in self.execute(text(sql), args).fetchall()]
def statement(self, sql, **kwargs):
"Execute a statement without returning any results. Flush first."
return self.execute(text(sql), kwargs)
def statements(self, sql, data):
"Execute a statement across data. Flush first."
return self.execute(text(sql), data)
def __repr__(self):
return repr(self._session)
def commit(self):
self._session.commit()
if self._transaction:
self._session.begin()
if self._lock:
self._lockDB()
def _lockDB(self):
"Take out a write lock."
self._session.execute(text("update decks set modified=modified"))
def object_session(*args):
s = _object_session(*args)
if s:
return SessionHelper(s, transaction=False)
return None
def sessionmaker(*args, **kwargs):
if sqlalchemy.__version__ < "0.5":
if 'autocommit' in kwargs:
kwargs['transactional'] = not kwargs['autocommit']
del kwargs['autocommit']
else:
if 'transactional' in kwargs:
kwargs['autocommit'] = not kwargs['transactional']
del kwargs['transactional']
return _sessionmaker(*args, **kwargs)

4522
oldanki/deck.py Normal file

File diff suppressed because it is too large Load Diff

49
oldanki/errors.py Normal file
View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Errors
==============================
"""
__docformat__ = 'restructuredtext'
class Error(Exception):
def __init__(self, message="", **data):
self.data = data
self._message = message
def __str__(self):
m = self._message
if self.data:
m += ": %s" % repr(self.data)
return m
class DeckAccessError(Error):
pass
class ImportFileError(Error):
"Unable to load file to import from."
pass
class ImportFormatError(Error):
"Unable to determine pattern in text file."
pass
class ImportEncodingError(Error):
"The file was not in utf-8."
pass
class ExportFileError(Error):
"Unable to save file."
pass
class SyncError(Error):
"A problem occurred during syncing."
pass
# facts, models
class FactInvalidError(Error):
"""A fact was invalid/not unique according to the model.
'field' defines the problem field.
'type' defines the type of error ('fieldEmpty', 'fieldNotUnique')"""
pass

274
oldanki/exporting.py Normal file
View File

@ -0,0 +1,274 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Exporting support
==============================
"""
__docformat__ = 'restructuredtext'
import itertools, time, re, os, HTMLParser
from operator import itemgetter
from oldanki import DeckStorage
from oldanki.cards import Card
from oldanki.sync import SyncClient, SyncServer, copyLocalMedia
from oldanki.lang import _
from oldanki.utils import findTag, parseTags, stripHTML, ids2str
from oldanki.tags import tagIds
from oldanki.db import *
class Exporter(object):
def __init__(self, deck):
self.deck = deck
self.limitTags = []
self.limitCardIds = []
def exportInto(self, path):
self._escapeCount = 0
file = open(path, "wb")
self.doExport(file)
file.close()
def escapeText(self, text, removeFields=False):
"Escape newlines and tabs, and strip Anki HTML."
from BeautifulSoup import BeautifulSoup as BS
text = text.replace("\n", "<br>")
text = text.replace("\t", " " * 8)
if removeFields:
# beautifulsoup is slow
self._escapeCount += 1
if self._escapeCount % 100 == 0:
self.deck.updateProgress()
try:
s = BS(text)
all = s('span', {'class': re.compile("fm.*")})
for e in all:
e.replaceWith("".join([unicode(x) for x in e.contents]))
text = unicode(s)
except HTMLParser.HTMLParseError:
pass
return text
def cardIds(self):
"Return all cards, limited by tags or provided ids."
if self.limitCardIds:
return self.limitCardIds
if not self.limitTags:
cards = self.deck.s.column0("select id from cards")
else:
d = tagIds(self.deck.s, self.limitTags, create=False)
cards = self.deck.s.column0(
"select cardId from cardTags where tagid in %s" %
ids2str(d.values()))
self.count = len(cards)
return cards
class AnkiExporter(Exporter):
key = _("Anki Deck (*.oldanki)")
ext = ".oldanki"
def __init__(self, deck):
Exporter.__init__(self, deck)
self.includeSchedulingInfo = False
self.includeMedia = True
def exportInto(self, path):
n = 3
if not self.includeSchedulingInfo:
n += 1
self.deck.startProgress(n)
self.deck.updateProgress(_("Exporting..."))
try:
os.unlink(path)
except (IOError, OSError):
pass
self.newDeck = DeckStorage.Deck(path)
client = SyncClient(self.deck)
server = SyncServer(self.newDeck)
client.setServer(server)
client.localTime = self.deck.modified
client.remoteTime = 0
self.deck.s.flush()
# set up a custom change list and sync
lsum = self.localSummary()
rsum = server.summary(0)
self.deck.updateProgress()
payload = client.genPayload((lsum, rsum))
self.deck.updateProgress()
res = server.applyPayload(payload)
if not self.includeSchedulingInfo:
self.deck.updateProgress()
self.newDeck.s.statement("""
delete from reviewHistory""")
self.newDeck.s.statement("""
update cards set
interval = 0,
lastInterval = 0,
due = created,
lastDue = 0,
factor = 2.5,
firstAnswered = 0,
reps = 0,
successive = 0,
averageTime = 0,
reviewTime = 0,
youngEase0 = 0,
youngEase1 = 0,
youngEase2 = 0,
youngEase3 = 0,
youngEase4 = 0,
matureEase0 = 0,
matureEase1 = 0,
matureEase2 = 0,
matureEase3 = 0,
matureEase4 = 0,
yesCount = 0,
noCount = 0,
spaceUntil = 0,
type = 2,
relativeDelay = 2,
combinedDue = created,
modified = :now
""", now=time.time())
self.newDeck.s.statement("""
delete from stats""")
# media
if self.includeMedia:
server.deck.mediaPrefix = ""
copyLocalMedia(client.deck, server.deck)
# need to save manually
self.newDeck.rebuildCounts()
self.newDeck.updateAllPriorities()
self.exportedCards = self.newDeck.cardCount
self.newDeck.utcOffset = -1
self.newDeck.s.commit()
self.newDeck.close()
self.deck.finishProgress()
def localSummary(self):
cardIds = self.cardIds()
cStrIds = ids2str(cardIds)
cards = self.deck.s.all("""
select id, modified from cards
where id in %s""" % cStrIds)
facts = self.deck.s.all("""
select facts.id, facts.modified from cards, facts where
facts.id = cards.factId and
cards.id in %s""" % cStrIds)
models = self.deck.s.all("""
select models.id, models.modified from models, facts where
facts.modelId = models.id and
facts.id in %s""" % ids2str([f[0] for f in facts]))
media = self.deck.s.all("""
select id, created from media""")
return {
# cards
"cards": cards,
"delcards": [],
# facts
"facts": facts,
"delfacts": [],
# models
"models": models,
"delmodels": [],
# media
"media": media,
"delmedia": [],
}
class TextCardExporter(Exporter):
key = _("Text files (*.txt)")
ext = ".txt"
def __init__(self, deck):
Exporter.__init__(self, deck)
self.includeTags = False
def doExport(self, file):
ids = self.cardIds()
strids = ids2str(ids)
self.deck.startProgress((len(ids) + 1) / 50)
self.deck.updateProgress(_("Exporting..."))
cards = self.deck.s.all("""
select cards.question, cards.answer, cards.id from cards
where cards.id in %s
order by cards.created""" % strids)
self.deck.updateProgress()
if self.includeTags:
self.cardTags = dict(self.deck.s.all("""
select cards.id, facts.tags from cards, facts
where cards.factId = facts.id
and cards.id in %s
order by cards.created""" % strids))
out = u"\n".join(["%s\t%s%s" % (
self.escapeText(c[0], removeFields=True),
self.escapeText(c[1], removeFields=True),
self.tags(c[2]))
for c in cards])
if out:
out += "\n"
file.write(out.encode("utf-8"))
self.deck.finishProgress()
def tags(self, id):
if self.includeTags:
return "\t" + ", ".join(parseTags(self.cardTags[id]))
return ""
class TextFactExporter(Exporter):
key = _("Text files (*.txt)")
ext = ".txt"
def __init__(self, deck):
Exporter.__init__(self, deck)
self.includeTags = False
def doExport(self, file):
cardIds = self.cardIds()
self.deck.startProgress()
self.deck.updateProgress(_("Exporting..."))
facts = self.deck.s.all("""
select factId, value, facts.created from facts, fields
where
facts.id in
(select distinct factId from cards
where cards.id in %s)
and facts.id = fields.factId
order by factId, ordinal""" % ids2str(cardIds))
txt = ""
self.deck.updateProgress()
if self.includeTags:
self.factTags = dict(self.deck.s.all(
"select id, tags from facts where id in %s" %
ids2str([fact[0] for fact in facts])))
groups = itertools.groupby(facts, itemgetter(0))
groups = [[x for x in y[1]] for y in groups]
groups = [(group[0][2],
"\t".join([self.escapeText(x[1]) for x in group]) +
self.tags(group[0][0]))
for group in groups]
self.deck.updateProgress()
groups.sort(key=itemgetter(0))
out = [ret[1] for ret in groups]
self.count = len(out)
out = "\n".join(out)
file.write(out.encode("utf-8"))
self.deck.finishProgress()
def tags(self, id):
if self.includeTags:
return "\t" + self.factTags[id]
return ""
# Export modules
##########################################################################
def exporters():
return (
(_("Anki Deck (*.oldanki)"), AnkiExporter),
(_("Cards in tab-separated text file (*.txt)"), TextCardExporter),
(_("Facts in tab-separated text file (*.txt)"), TextFactExporter))

157
oldanki/facts.py Normal file
View File

@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Facts
========
"""
__docformat__ = 'restructuredtext'
import time
from oldanki.db import *
from oldanki.errors import *
from oldanki.models import Model, FieldModel, fieldModelsTable
from oldanki.utils import genID, stripHTMLMedia
from oldanki.hooks import runHook
# Fields in a fact
##########################################################################
fieldsTable = Table(
'fields', metadata,
Column('id', Integer, primary_key=True),
Column('factId', Integer, ForeignKey("facts.id"), nullable=False),
Column('fieldModelId', Integer, ForeignKey("fieldModels.id"),
nullable=False),
Column('ordinal', Integer, nullable=False),
Column('value', UnicodeText, nullable=False))
class Field(object):
"A field in a fact."
def __init__(self, fieldModel=None):
if fieldModel:
self.fieldModel = fieldModel
self.ordinal = fieldModel.ordinal
self.value = u""
self.id = genID()
def getName(self):
return self.fieldModel.name
name = property(getName)
mapper(Field, fieldsTable, properties={
'fieldModel': relation(FieldModel)
})
# Facts: a set of fields and a model
##########################################################################
# mapped in cards.py
factsTable = Table(
'facts', metadata,
Column('id', Integer, primary_key=True),
Column('modelId', Integer, ForeignKey("models.id"), nullable=False),
Column('created', Float, nullable=False, default=time.time),
Column('modified', Float, nullable=False, default=time.time),
Column('tags', UnicodeText, nullable=False, default=u""),
# spaceUntil is reused as a html-stripped cache of the fields
Column('spaceUntil', UnicodeText, nullable=False, default=u""),
# obsolete
Column('lastCardId', Integer, ForeignKey(
"cards.id", use_alter=True, name="lastCardIdfk")))
class Fact(object):
"A single fact. Fields exposed as dict interface."
def __init__(self, model=None):
self.model = model
self.id = genID()
if model:
for fm in model.fieldModels:
self.fields.append(Field(fm))
self.new = True
def isNew(self):
return getattr(self, 'new', False)
def keys(self):
return [field.name for field in self.fields]
def values(self):
return [field.value for field in self.fields]
def __getitem__(self, key):
try:
return [f.value for f in self.fields if f.name == key][0]
except IndexError:
raise KeyError(key)
def __setitem__(self, key, value):
try:
[f for f in self.fields if f.name == key][0].value = value
except IndexError:
raise KeyError
def get(self, key, default):
try:
return self[key]
except (IndexError, KeyError):
return default
def assertValid(self):
"Raise an error if required fields are empty."
for field in self.fields:
if not self.fieldValid(field):
raise FactInvalidError(type="fieldEmpty",
field=field.name)
def fieldValid(self, field):
return not (field.fieldModel.required and not field.value.strip())
def assertUnique(self, s):
"Raise an error if duplicate fields are found."
for field in self.fields:
if not self.fieldUnique(field, s):
raise FactInvalidError(type="fieldNotUnique",
field=field.name)
def fieldUnique(self, field, s):
if not field.fieldModel.unique:
return True
req = ("select value from fields "
"where fieldModelId = :fmid and value = :val")
if field.id:
req += " and id != %s" % field.id
return not s.scalar(req, val=field.value, fmid=field.fieldModel.id)
def focusLost(self, field):
runHook('fact.focusLost', self, field)
def setModified(self, textChanged=False, deck=None, media=True):
"Mark modified and update cards."
self.modified = time.time()
if textChanged:
if not deck:
# FIXME: compat code
import ankiqt
if not getattr(ankiqt, 'setModWarningShown', None):
import sys; sys.stderr.write(
"plugin needs to pass deck to fact.setModified()")
ankiqt.setModWarningShown = True
deck = ankiqt.mw.deck
assert deck
self.spaceUntil = stripHTMLMedia(u" ".join(
self.values()))
for card in self.cards:
card.rebuildQA(deck)
# Fact deletions
##########################################################################
factsDeletedTable = Table(
'factsDeleted', metadata,
Column('factId', Integer, ForeignKey("facts.id"),
nullable=False),
Column('deletedTime', Float, nullable=False))

55
oldanki/fonts.py Normal file
View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Fonts - mapping to/from platform-specific fonts
==============================================================
"""
import sys
# set this to 'all', to get all fonts in a list
policy="platform"
mapping = [
[u"Mincho", u"MS Mincho", "win32"],
[u"Mincho", u" 明朝", "win32"],
[u"Mincho", u"ヒラギノ明朝 Pro W3", "mac"],
[u"Mincho", u"Kochi Mincho", "linux"],
[u"Mincho", u"東風明朝", "linux"],
]
def platform():
if sys.platform == "win32":
return "win32"
elif sys.platform.startswith("darwin"):
return "mac"
else:
return "linux"
def toCanonicalFont(family):
"Turn a platform-specific family into a canonical one."
for (s, p, type) in mapping:
if family == p:
return s
return family
def toPlatformFont(family):
"Turn a canonical font into a platform-specific one."
if policy == "all":
return allFonts(family)
ltype = platform()
for (s, p, type) in mapping:
if family == s and type == ltype:
return p
return family
def substitutions():
"Return a tuple mapping canonical fonts to platform ones."
type = platform()
return [(s, p) for (s, p, t) in mapping if t == type]
def allFonts(family):
ret = ", ".join([p for (s, p, t) in mapping if s == family])
return ret or family

406
oldanki/graphs.py Normal file
View File

@ -0,0 +1,406 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Graphs of deck statistics
==============================
"""
__docformat__ = 'restructuredtext'
import os, sys, time
import oldanki.stats
from oldanki.lang import _
import datetime
#colours for graphs
dueYoungC = "#ffb380"
dueMatureC = "#ff5555"
dueCumulC = "#ff8080"
reviewNewC = "#80ccff"
reviewYoungC = "#3377ff"
reviewMatureC = "#0000ff"
reviewTimeC = "#0fcaff"
easesNewC = "#80b3ff"
easesYoungC = "#5555ff"
easesMatureC = "#0f5aff"
addedC = "#b3ff80"
firstC = "#b380ff"
intervC = "#80e5ff"
# support frozen distribs
if sys.platform.startswith("darwin"):
try:
del os.environ['MATPLOTLIBDATA']
except:
pass
try:
from matplotlib.figure import Figure
except UnicodeEncodeError:
# haven't tracked down the cause of this yet, but reloading fixes it
try:
from matplotlib.figure import Figure
except ImportError:
pass
except ImportError:
pass
def graphsAvailable():
return 'matplotlib' in sys.modules
class DeckGraphs(object):
def __init__(self, deck, width=8, height=3, dpi=75, selective=True):
self.deck = deck
self.stats = None
self.width = width
self.height = height
self.dpi = dpi
self.selective = selective
def calcStats (self):
if not self.stats:
days = {}
daysYoung = {}
daysMature = {}
months = {}
next = {}
lowestInDay = 0
self.endOfDay = self.deck.failedCutoff
t = time.time()
young = """
select interval, combinedDue from cards c
where relativeDelay between 0 and 1 and type >= 0 and interval <= 21"""
mature = """
select interval, combinedDue
from cards c where relativeDelay = 1 and type >= 0 and interval > 21"""
if self.selective:
young = self.deck._cardLimit("revActive", "revInactive",
young)
mature = self.deck._cardLimit("revActive", "revInactive",
mature)
young = self.deck.s.all(young)
mature = self.deck.s.all(mature)
for (src, dest) in [(young, daysYoung),
(mature, daysMature)]:
for (interval, due) in src:
day=int(round(interval))
days[day] = days.get(day, 0) + 1
indays = int(((due - self.endOfDay) / 86400.0) + 1)
next[indays] = next.get(indays, 0) + 1 # type-agnostic stats
dest[indays] = dest.get(indays, 0) + 1 # type-specific stats
if indays < lowestInDay:
lowestInDay = indays
self.stats = {}
self.stats['next'] = next
self.stats['days'] = days
self.stats['daysByType'] = {'young': daysYoung,
'mature': daysMature}
self.stats['months'] = months
self.stats['lowestInDay'] = lowestInDay
dayReps = self.deck.s.all("""
select day,
matureEase0+matureEase1+matureEase2+matureEase3+matureEase4 as matureReps,
reps-(newEase0+newEase1+newEase2+newEase3+newEase4) as combinedYoungReps,
reps as combinedNewReps
from stats
where type = 1""")
dayTimes = self.deck.s.all("""
select day, reviewTime as reviewTime
from stats
where type = 1""")
todaydt = self.deck._dailyStats.day
for dest, source in [("dayRepsNew", "combinedNewReps"),
("dayRepsYoung", "combinedYoungReps"),
("dayRepsMature", "matureReps")]:
self.stats[dest] = dict(
map(lambda dr: (-(todaydt -datetime.date(
*(int(x)for x in dr["day"].split("-")))).days, dr[source]), dayReps))
self.stats['dayTimes'] = dict(
map(lambda dr: (-(todaydt -datetime.date(
*(int(x)for x in dr["day"].split("-")))).days, dr["reviewTime"]/60.0), dayTimes))
def nextDue(self, days=30):
self.calcStats()
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
graph = fig.add_subplot(111)
dayslists = [self.stats['next'], self.stats['daysByType']['mature']]
for dayslist in dayslists:
self.addMissing(dayslist, self.stats['lowestInDay'], days)
argl = []
for dayslist in dayslists:
dl = [x for x in dayslist.items() if x[0] <= days]
argl.extend(list(self.unzip(dl)))
self.varGraph(graph, days, [dueYoungC, dueMatureC], *argl)
cheat = fig.add_subplot(111)
b1 = cheat.bar(0, 0, color = dueYoungC)
b2 = cheat.bar(1, 0, color = dueMatureC)
cheat.legend([b1, b2], [
"Young",
"Mature"], loc='upper right')
graph.set_xlim(xmin=self.stats['lowestInDay'], xmax=days+1)
graph.set_xlabel("Day (0 = today)")
graph.set_ylabel("Cards Due")
return fig
def workDone(self, days=30):
self.calcStats()
for type in ["dayRepsNew", "dayRepsYoung", "dayRepsMature"]:
self.addMissing(self.stats[type], -days, 0)
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
graph = fig.add_subplot(111)
args = sum((self.unzip(self.stats[type].items(), limit=days, reverseLimit=True) for type in ["dayRepsMature", "dayRepsYoung", "dayRepsNew"][::-1]), [])
self.varGraph(graph, days, [reviewNewC, reviewYoungC, reviewMatureC], *args)
cheat = fig.add_subplot(111)
b1 = cheat.bar(-3, 0, color = reviewNewC)
b2 = cheat.bar(-4, 0, color = reviewYoungC)
b3 = cheat.bar(-5, 0, color = reviewMatureC)
cheat.legend([b1, b2, b3], [
"New",
"Young",
"Mature"], loc='upper left')
graph.set_xlim(xmin=-days+1, xmax=1)
graph.set_ylim(ymax=max(max(a for a in args[1::2])) + 10)
graph.set_xlabel("Day (0 = today)")
graph.set_ylabel("Cards Answered")
return fig
def timeSpent(self, days=30):
self.calcStats()
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
times = self.stats['dayTimes']
self.addMissing(times, -days+1, 0)
times = self.unzip([(day,y) for (day,y) in times.items()
if day + days >= 0])
graph = fig.add_subplot(111)
self.varGraph(graph, days, reviewTimeC, *times)
graph.set_xlim(xmin=-days+1, xmax=1)
graph.set_ylim(ymax=max(a for a in times[1]) + 0.1)
graph.set_xlabel("Day (0 = today)")
graph.set_ylabel("Minutes")
return fig
def cumulativeDue(self, days=30):
self.calcStats()
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
graph = fig.add_subplot(111)
self.addMissing(self.stats['next'], 0, days-1)
dl = [x for x in self.stats['next'].items() if x[0] <= days]
(x, y) = self.unzip(dl)
count=0
y = list(y)
for i in range(len(x)):
count = count + y[i]
if i == 0:
continue
y[i] = count
if x[i] > days:
break
self._filledGraph(graph, days, dueCumulC, 1, x, y)
graph.set_xlim(xmin=self.stats['lowestInDay'], xmax=days-1)
graph.set_ylim(ymax=graph.get_ylim()[1]+10)
graph.set_xlabel("Day (0 = today)")
graph.set_ylabel("Cards Due")
return fig
def intervalPeriod(self, days=30):
self.calcStats()
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
ints = self.stats['days']
self.addMissing(ints, 0, days)
intervals = self.unzip(ints.items(), limit=days)
graph = fig.add_subplot(111)
self.varGraph(graph, days, intervC, *intervals)
graph.set_xlim(xmin=0, xmax=days+1)
graph.set_xlabel("Card Interval")
graph.set_ylabel("Number of Cards")
return fig
def addedRecently(self, numdays=30, attr='created'):
self.calcStats()
days = {}
fig = Figure(figsize=(self.width, self.height), dpi=self.dpi)
limit = self.endOfDay - (numdays) * 86400
res = self.deck.s.column0("select %s from cards where %s >= %f" %
(attr, attr, limit))
for r in res:
d = int((r - self.endOfDay) / 86400.0)
days[d] = days.get(d, 0) + 1
self.addMissing(days, -numdays+1, 0)
graph = fig.add_subplot(111)
intervals = self.unzip(days.items())
if attr == 'created':
colour = addedC
else:
colour = firstC
self.varGraph(graph, numdays, colour, *intervals)
graph.set_xlim(xmin=-numdays+1, xmax=1)
graph.set_xlabel("Day (0 = today)")
if attr == 'created':
graph.set_ylabel("Cards Added")
else:
graph.set_ylabel("Cards First Answered")
return fig
def addMissing(self, dic, min, max):
for i in range(min, max+1):
if not i in dic:
dic[i] = 0
def unzip(self, tuples, fillFix=True, limit=None, reverseLimit=False):
tuples.sort(cmp=lambda x,y: cmp(x[0], y[0]))
if limit:
if reverseLimit:
tuples = tuples[-limit:]
else:
tuples = tuples[:limit+1]
new = zip(*tuples)
return new
def varGraph(self, graph, days, colours=["b"], *args):
if len(args[0]) < 120:
return self.barGraph(graph, days, colours, *args)
else:
return self.filledGraph(graph, days, colours, *args)
def filledGraph(self, graph, days, colours=["b"], *args):
self._filledGraph(graph, days, colours, 0, *args)
def _filledGraph(self, graph, days, colours, lw, *args):
if isinstance(colours, str):
colours = [colours]
for triplet in [(args[n], args[n + 1], colours[n / 2]) for n in range(0, len(args), 2)]:
x = list(triplet[0])
y = list(triplet[1])
c = triplet[2]
lowest = 99999
highest = -lowest
for i in range(len(x)):
if x[i] < lowest:
lowest = x[i]
if x[i] > highest:
highest = x[i]
# ensure the filled area reaches the bottom
x.insert(0, lowest - 1)
y.insert(0, 0)
x.append(highest + 1)
y.append(0)
# plot
graph.fill(x, y, c, lw=lw)
graph.grid(True)
graph.set_ylim(ymin=0, ymax=max(2, graph.get_ylim()[1]))
def barGraph(self, graph, days, colours, *args):
if isinstance(colours, str):
colours = [colours]
lim = None
for triplet in [(args[n], args[n + 1], colours[n / 2]) for n in range(0, len(args), 2)]:
x = list(triplet[0])
y = list(triplet[1])
c = triplet[2]
lw = 0
if lim is None:
lim = (x[0], x[-1])
length = (lim[1] - lim[0])
if len(args) > 4:
if length <= 30:
lw = 1
else:
if length <= 90:
lw = 1
lowest = 99999
highest = -lowest
for i in range(len(x)):
if x[i] < lowest:
lowest = x[i]
if x[i] > highest:
highest = x[i]
graph.bar(x, y, color=c, width=1, linewidth=lw)
graph.grid(True)
graph.set_ylim(ymin=0, ymax=max(2, graph.get_ylim()[1]))
import numpy as np
if length > 10:
step = length / 10.0
# python's range() won't accept float step args, so we do it manually
if lim[0] < 0:
ticks = [int(lim[1] - step * x) for x in range(10)]
else:
ticks = [int(lim[0] + step * x) for x in range(10)]
else:
ticks = list(xrange(lim[0], lim[1]+1))
graph.set_xticks(np.array(ticks) + 0.5)
graph.set_xticklabels([str(int(x)) for x in ticks])
for tick in graph.xaxis.get_major_ticks():
tick.tick1On = False
tick.tick2On = False
def easeBars(self):
fig = Figure(figsize=(3, 3), dpi=self.dpi)
graph = fig.add_subplot(111)
types = ("new", "young", "mature")
enum = 5
offset = 0
arrsize = 16
arr = [0] * arrsize
n = 0
colours = [easesNewC, easesYoungC, easesMatureC]
bars = []
gs = oldanki.stats.globalStats(self.deck)
for type in types:
total = (getattr(gs, type + "Ease0") +
getattr(gs, type + "Ease1") +
getattr(gs, type + "Ease2") +
getattr(gs, type + "Ease3") +
getattr(gs, type + "Ease4"))
setattr(gs, type + "Ease1", getattr(gs, type + "Ease0") +
getattr(gs, type + "Ease1"))
setattr(gs, type + "Ease0", -1)
for e in range(1, enum):
try:
arr[e+offset] = (getattr(gs, type + "Ease%d" % e)
/ float(total)) * 100 + 1
except ZeroDivisionError:
arr[e+offset] = 0
bars.append(graph.bar(range(arrsize), arr, width=1.0,
color=colours[n], align='center'))
arr = [0] * arrsize
offset += 5
n += 1
x = ([""] + [str(n) for n in range(1, enum)]) * 3
graph.legend([p[0] for p in bars], ("New",
"Young",
"Mature"),
'upper left')
graph.set_ylim(ymax=100)
graph.set_xlim(xmax=15)
graph.set_xticks(range(arrsize))
graph.set_xticklabels(x)
graph.set_ylabel("% of Answers")
graph.set_xlabel("Answer Buttons")
graph.grid(True)
return fig

75
oldanki/history.py Normal file
View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
History - keeping a record of all reviews
==========================================
If users run 'check db', duplicate records will be inserted into the DB - I
really should have used the time stamp as the key. You can remove them by
keeping the lowest id for any given timestamp.
"""
__docformat__ = 'restructuredtext'
import time
from oldanki.db import *
reviewHistoryTable = Table(
'reviewHistory', metadata,
Column('cardId', Integer, nullable=False),
Column('time', Float, nullable=False, default=time.time),
Column('lastInterval', Float, nullable=False),
Column('nextInterval', Float, nullable=False),
Column('ease', Integer, nullable=False),
Column('delay', Float, nullable=False),
Column('lastFactor', Float, nullable=False),
Column('nextFactor', Float, nullable=False),
Column('reps', Float, nullable=False),
Column('thinkingTime', Float, nullable=False),
Column('yesCount', Float, nullable=False),
Column('noCount', Float, nullable=False),
PrimaryKeyConstraint("cardId", "time"))
class CardHistoryEntry(object):
"Create after rescheduling card."
def __init__(self, card=None, ease=None, delay=None):
if not card:
return
self.cardId = card.id
self.lastInterval = card.lastInterval
self.nextInterval = card.interval
self.lastFactor = card.lastFactor
self.nextFactor = card.factor
self.reps = card.reps
self.yesCount = card.yesCount
self.noCount = card.noCount
self.ease = ease
self.delay = delay
self.thinkingTime = card.thinkingTime()
def writeSQL(self, s):
s.statement("""
insert into reviewHistory
(cardId, lastInterval, nextInterval, ease, delay, lastFactor,
nextFactor, reps, thinkingTime, yesCount, noCount, time)
values (
:cardId, :lastInterval, :nextInterval, :ease, :delay,
:lastFactor, :nextFactor, :reps, :thinkingTime, :yesCount, :noCount,
:time)""",
cardId=self.cardId,
lastInterval=self.lastInterval,
nextInterval=self.nextInterval,
ease=self.ease,
delay=self.delay,
lastFactor=self.lastFactor,
nextFactor=self.nextFactor,
reps=self.reps,
thinkingTime=self.thinkingTime,
yesCount=self.yesCount,
noCount=self.noCount,
time=time.time())
mapper(CardHistoryEntry, reviewHistoryTable)

65
oldanki/hooks.py Normal file
View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Hooks - hook management and tools for extending Anki
==============================================================================
To find available hooks, grep for runHook in the source code.
Instrumenting allows you to modify functions that don't have hooks available.
If you call wrap() with pos='around', the original function will not be called
automatically but can be called with _old().
"""
# Hooks
##############################################################################
_hooks = {}
def runHook(hook, *args):
"Run all functions on hook."
hook = _hooks.get(hook, None)
if hook:
for func in hook:
func(*args)
def runFilter(hook, arg, *args):
hook = _hooks.get(hook, None)
if hook:
for func in hook:
arg = func(arg, *args)
return arg
def addHook(hook, func):
"Add a function to hook. Ignore if already on hook."
if not _hooks.get(hook, None):
_hooks[hook] = []
if func not in _hooks[hook]:
_hooks[hook].append(func)
def removeHook(hook, func):
"Remove a function if is on hook."
hook = _hooks.get(hook, [])
if func in hook:
hook.remove(func)
def hookEmpty(hook):
return not _hooks.get(hook)
# Instrumenting
##############################################################################
def wrap(old, new, pos="after"):
"Override an existing function."
def repl(*args, **kwargs):
if pos == "after":
old(*args, **kwargs)
return new(*args, **kwargs)
elif pos == "before":
new(*args, **kwargs)
return old(*args, **kwargs)
else:
return new(_old=old, *args, **kwargs)
return repl

59
oldanki/lang.py Normal file
View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Internationalisation
=====================
"""
__docformat__ = 'restructuredtext'
import os, sys
import gettext
import threading
threadLocal = threading.local()
# global defaults
currentLang = None
currentTranslation = None
def localTranslation():
"Return the translation local to this thread, or the default."
if getattr(threadLocal, 'currentTranslation', None):
return threadLocal.currentTranslation
else:
return currentTranslation
def _(str):
return localTranslation().ugettext(str)
def ngettext(single, plural, n):
return localTranslation().ungettext(single, plural, n)
def setLang(lang, local=True):
base = os.path.dirname(os.path.abspath(__file__))
localeDir = os.path.join(base, "locale")
if not os.path.exists(localeDir):
localeDir = os.path.join(
os.path.dirname(sys.argv[0]), "locale")
trans = gettext.translation('libanki', localeDir,
languages=[lang],
fallback=True)
if local:
threadLocal.currentLang = lang
threadLocal.currentTranslation = trans
else:
global currentLang, currentTranslation
currentLang = lang
currentTranslation = trans
def getLang():
"Return the language local to this thread, or the default."
if getattr(threadLocal, 'currentLang', None):
return threadLocal.currentLang
else:
return currentLang
if not currentTranslation:
setLang("en_US", local=False)

136
oldanki/latex.py Normal file
View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Latex support
==============================
"""
__docformat__ = 'restructuredtext'
import re, tempfile, os, sys, shutil, cgi, subprocess
from oldanki.utils import genID, checksum, call
from oldanki.hooks import addHook
from htmlentitydefs import entitydefs
from oldanki.lang import _
latexDviPngCmd = ["dvipng", "-D", "200", "-T", "tight"]
regexps = {
"standard": re.compile(r"\[latex\](.+?)\[/latex\]", re.DOTALL | re.IGNORECASE),
"expression": re.compile(r"\[\$\](.+?)\[/\$\]", re.DOTALL | re.IGNORECASE),
"math": re.compile(r"\[\$\$\](.+?)\[/\$\$\]", re.DOTALL | re.IGNORECASE),
}
tmpdir = tempfile.mkdtemp(prefix="oldanki")
# add standard tex install location to osx
if sys.platform == "darwin":
os.environ['PATH'] += ":/usr/texbin"
def renderLatex(deck, text, build=True):
"Convert TEXT with embedded latex tags to image links."
for match in regexps['standard'].finditer(text):
text = text.replace(match.group(), imgLink(deck, match.group(1),
build))
for match in regexps['expression'].finditer(text):
text = text.replace(match.group(), imgLink(
deck, "$" + match.group(1) + "$", build))
for match in regexps['math'].finditer(text):
text = text.replace(match.group(), imgLink(
deck,
"\\begin{displaymath}" + match.group(1) + "\\end{displaymath}",
build))
return text
def stripLatex(text):
for match in regexps['standard'].finditer(text):
text = text.replace(match.group(), "")
for match in regexps['expression'].finditer(text):
text = text.replace(match.group(), "")
for match in regexps['math'].finditer(text):
text = text.replace(match.group(), "")
return text
def latexImgFile(deck, latexCode):
key = checksum(latexCode)
return "latex-%s.png" % key
def mungeLatex(deck, latex):
"Convert entities, fix newlines, convert to utf8, and wrap pre/postamble."
for match in re.compile("&([a-z]+);", re.IGNORECASE).finditer(latex):
if match.group(1) in entitydefs:
latex = latex.replace(match.group(), entitydefs[match.group(1)])
latex = re.sub("<br( /)?>", "\n", latex)
latex = (deck.getVar("latexPre") + "\n" +
latex + "\n" +
deck.getVar("latexPost"))
latex = latex.encode("utf-8")
return latex
def buildImg(deck, latex):
log = open(os.path.join(tmpdir, "latex_log.txt"), "w+")
texpath = os.path.join(tmpdir, "tmp.tex")
texfile = file(texpath, "w")
texfile.write(latex)
texfile.close()
# make sure we have a valid mediaDir
mdir = deck.mediaDir(create=True)
oldcwd = os.getcwd()
if sys.platform == "win32":
si = subprocess.STARTUPINFO()
try:
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
except:
si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
else:
si = None
try:
os.chdir(tmpdir)
def errmsg(type):
msg = _("Error executing %s.\n") % type
try:
log = open(os.path.join(tmpdir, "latex_log.txt")).read()
msg += "<small><pre>" + cgi.escape(log) + "</pre></small>"
except:
msg += _("Have you installed latex and dvipng?")
pass
return msg
if call(["latex", "-interaction=nonstopmode",
"tmp.tex"], stdout=log, stderr=log, startupinfo=si):
return (False, errmsg("latex"))
if call(latexDviPngCmd + ["tmp.dvi", "-o", "tmp.png"],
stdout=log, stderr=log, startupinfo=si):
return (False, errmsg("dvipng"))
# add to media
target = latexImgFile(deck, latex)
shutil.copy2(os.path.join(tmpdir, "tmp.png"),
os.path.join(mdir, target))
return (True, target)
finally:
os.chdir(oldcwd)
def imageForLatex(deck, latex, build=True):
"Return an image that represents 'latex', building if necessary."
imageFile = latexImgFile(deck, latex)
ok = True
if build and (not imageFile or not os.path.exists(imageFile)):
(ok, imageFile) = buildImg(deck, latex)
if not ok:
return (False, imageFile)
return (True, imageFile)
def imgLink(deck, latex, build=True):
"Parse LATEX and return a HTML image representing the output."
munged = mungeLatex(deck, latex)
(ok, img) = imageForLatex(deck, munged, build)
if ok:
return '<img src="%s" alt="%s">' % (img, latex)
else:
return img
def formatQA(html, type, cid, mid, fact, tags, cm, deck):
return renderLatex(deck, html)
# setup q/a filter
addHook("formatQA", formatQA)

286
oldanki/media.py Normal file
View File

@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Media support
====================
"""
__docformat__ = 'restructuredtext'
import os, shutil, re, urllib2, time, tempfile, unicodedata, urllib
from oldanki.db import *
from oldanki.utils import checksum, genID
from oldanki.lang import _
# other code depends on this order, so don't reorder
regexps = ("(?i)(\[sound:([^]]+)\])",
"(?i)(<img[^>]+src=[\"']?([^\"'>]+)[\"']?[^>]*>)")
# Tables
##########################################################################
mediaTable = Table(
'media', metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('filename', UnicodeText, nullable=False),
# reused as reference count
Column('size', Integer, nullable=False),
# treated as modification date, not creation date
Column('created', Float, nullable=False),
# reused as md5sum. empty string if file doesn't exist on disk
Column('originalPath', UnicodeText, nullable=False, default=u""),
# older versions stored original filename here, so we'll leave it for now
# in case we add a feature to rename media back to its original name. in
# the future we may want to zero this to save space
Column('description', UnicodeText, nullable=False, default=u""))
class Media(object):
pass
mapper(Media, mediaTable)
mediaDeletedTable = Table(
'mediaDeleted', metadata,
Column('mediaId', Integer, ForeignKey("cards.id"),
nullable=False),
Column('deletedTime', Float, nullable=False))
# File handling
##########################################################################
def copyToMedia(deck, path):
"""Copy PATH to MEDIADIR, and return new filename.
If a file with the same md5sum exists in the DB, return that.
If a file with the same name exists, return a unique name.
This does not modify the media table."""
# see if have duplicate contents
newpath = deck.s.scalar(
"select filename from media where originalPath = :cs",
cs=checksum(open(path, "rb").read()))
# check if this filename already exists
if not newpath:
base = os.path.basename(path)
mdir = deck.mediaDir(create=True)
newpath = uniquePath(mdir, base)
shutil.copy2(path, newpath)
return os.path.basename(newpath)
def uniquePath(dir, base):
# remove any dangerous characters
base = re.sub(r"[][<>:/\\&]", "", base)
# find a unique name
(root, ext) = os.path.splitext(base)
def repl(match):
n = int(match.group(1))
return " (%d)" % (n+1)
while True:
path = os.path.join(dir, root + ext)
if not os.path.exists(path):
break
reg = " \((\d+)\)$"
if not re.search(reg, root):
root = root + " (1)"
else:
root = re.sub(reg, repl, root)
return path
# DB routines
##########################################################################
def updateMediaCount(deck, file, count=1):
mdir = deck.mediaDir()
if deck.s.scalar(
"select 1 from media where filename = :file", file=file):
deck.s.statement(
"update media set size = size + :c, created = :t where filename = :file",
file=file, c=count, t=time.time())
elif count > 0:
try:
sum = unicode(
checksum(open(os.path.join(mdir, file), "rb").read()))
except:
sum = u""
deck.s.statement("""
insert into media (id, filename, size, created, originalPath, description)
values (:id, :file, :c, :mod, :sum, '')""",
id=genID(), file=file, c=count, mod=time.time(),
sum=sum)
def removeUnusedMedia(deck):
ids = deck.s.column0("select id from media where size = 0")
for id in ids:
deck.s.statement("insert into mediaDeleted values (:id, :t)",
id=id, t=time.time())
deck.s.statement("delete from media where size = 0")
# String manipulation
##########################################################################
def mediaFiles(string, remote=False):
l = []
for reg in regexps:
for (full, fname) in re.findall(reg, string):
isLocal = not re.match("(https?|ftp)://", fname.lower())
if not remote and isLocal:
l.append(fname)
elif remote and not isLocal:
l.append(fname)
return l
def stripMedia(txt):
for reg in regexps:
txt = re.sub(reg, "", txt)
return txt
def escapeImages(string):
def repl(match):
tag = match.group(1)
fname = match.group(2)
if re.match("(https?|ftp)://", fname):
return tag
return tag.replace(
fname, urllib.quote(fname.encode("utf-8")))
return re.sub(regexps[1], repl, string)
# Rebuilding DB
##########################################################################
def rebuildMediaDir(deck, delete=False, dirty=True):
mdir = deck.mediaDir()
if not mdir:
return (0, 0)
deck.startProgress(title=_("Check Media DB"))
# set all ref counts to 0
deck.s.statement("update media set size = 0")
# look through cards for media references
refs = {}
normrefs = {}
def norm(s):
if isinstance(s, unicode):
return unicodedata.normalize('NFD', s)
return s
for (question, answer) in deck.s.all(
"select question, answer from cards"):
for txt in (question, answer):
for f in mediaFiles(txt):
if f in refs:
refs[f] += 1
else:
refs[f] = 1
normrefs[norm(f)] = True
# update ref counts
for (file, count) in refs.items():
updateMediaCount(deck, file, count)
# find unused media
unused = []
for file in os.listdir(mdir):
path = os.path.join(mdir, file)
if not os.path.isfile(path):
# ignore directories
continue
nfile = norm(file)
if nfile not in normrefs:
unused.append(file)
# optionally delete
if delete:
for f in unused:
path = os.path.join(mdir, f)
os.unlink(path)
# remove entries in db for unused media
removeUnusedMedia(deck)
# check md5s are up to date
update = []
for (file, created, md5) in deck.s.all(
"select filename, created, originalPath from media"):
path = os.path.join(mdir, file)
if not os.path.exists(path):
if md5:
update.append({'f':file, 'sum':u"", 'c':time.time()})
else:
sum = unicode(
checksum(open(os.path.join(mdir, file), "rb").read()))
if md5 != sum:
update.append({'f':file, 'sum':sum, 'c':time.time()})
if update:
deck.s.statements("""
update media set originalPath = :sum, created = :c where filename = :f""",
update)
# update deck and get return info
if dirty:
deck.flushMod()
nohave = deck.s.column0("select filename from media where originalPath = ''")
deck.finishProgress()
return (nohave, unused)
# Download missing
##########################################################################
def downloadMissing(deck):
urlbase = deck.getVar("mediaURL")
if not urlbase:
return None
mdir = deck.mediaDir(create=True)
deck.startProgress()
missing = 0
grabbed = 0
for c, (f, sum) in enumerate(deck.s.all(
"select filename, originalPath from media")):
path = os.path.join(mdir, f)
if not os.path.exists(path):
try:
rpath = urlbase + f
url = urllib2.urlopen(rpath)
open(f, "wb").write(url.read())
grabbed += 1
except:
if sum:
# the file is supposed to exist
deck.finishProgress()
return (False, rpath)
else:
# ignore and keep going
missing += 1
deck.updateProgress(label=_("File %d...") % (grabbed+missing))
deck.finishProgress()
return (True, grabbed, missing)
# Convert remote links to local ones
##########################################################################
def downloadRemote(deck):
mdir = deck.mediaDir(create=True)
refs = {}
deck.startProgress()
for (question, answer) in deck.s.all(
"select question, answer from cards"):
for txt in (question, answer):
for f in mediaFiles(txt, remote=True):
refs[f] = True
tmpdir = tempfile.mkdtemp(prefix="oldanki")
failed = []
passed = []
for c, link in enumerate(refs.keys()):
try:
path = os.path.join(tmpdir, os.path.basename(link))
url = urllib2.urlopen(link)
open(path, "wb").write(url.read())
newpath = copyToMedia(deck, path)
passed.append([link, newpath])
except:
failed.append(link)
deck.updateProgress(label=_("Download %d...") % c)
for (url, name) in passed:
deck.s.statement(
"update fields set value = replace(value, :url, :name)",
url=url, name=name)
deck.updateProgress(label=_("Updating references..."))
deck.updateProgress(label=_("Updating cards..."))
# rebuild entire q/a cache
for m in deck.models:
deck.updateCardsFromModel(m, dirty=True)
deck.finishProgress()
deck.flushMod()
return (passed, failed)

220
oldanki/models.py Normal file
View File

@ -0,0 +1,220 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Model - define the way in which facts are added and shown
==========================================================
- Field models
- Card models
- Models
"""
import time, re
from sqlalchemy.ext.orderinglist import ordering_list
from oldanki.db import *
from oldanki.utils import genID, canonifyTags
from oldanki.fonts import toPlatformFont
from oldanki.utils import parseTags, hexifyID, checksum, stripHTML
from oldanki.lang import _
from oldanki.hooks import runFilter
from oldanki.template import render
from copy import copy
def alignmentLabels():
return {
0: _("Center"),
1: _("Left"),
2: _("Right"),
}
# Field models
##########################################################################
fieldModelsTable = Table(
'fieldModels', metadata,
Column('id', Integer, primary_key=True),
Column('ordinal', Integer, nullable=False),
Column('modelId', Integer, ForeignKey('models.id'), nullable=False),
Column('name', UnicodeText, nullable=False),
Column('description', UnicodeText, nullable=False, default=u""), # obsolete
# reused as RTL marker
Column('features', UnicodeText, nullable=False, default=u""),
Column('required', Boolean, nullable=False, default=True),
Column('unique', Boolean, nullable=False, default=True), # sqlite keyword
Column('numeric', Boolean, nullable=False, default=False),
# display
Column('quizFontFamily', UnicodeText, default=u"Arial"),
Column('quizFontSize', Integer, default=20),
Column('quizFontColour', String(7)),
Column('editFontFamily', UnicodeText, default=u"1"), # reused as <pre> toggle
Column('editFontSize', Integer, default=20))
class FieldModel(object):
"The definition of one field in a fact."
def __init__(self, name=u"", required=True, unique=True):
self.name = name
self.required = required
self.unique = unique
self.id = genID()
def copy(self):
new = FieldModel()
for p in class_mapper(FieldModel).iterate_properties:
setattr(new, p.key, getattr(self, p.key))
new.id = genID()
new.model = None
return new
mapper(FieldModel, fieldModelsTable)
# Card models
##########################################################################
cardModelsTable = Table(
'cardModels', metadata,
Column('id', Integer, primary_key=True),
Column('ordinal', Integer, nullable=False),
Column('modelId', Integer, ForeignKey('models.id'), nullable=False),
Column('name', UnicodeText, nullable=False),
Column('description', UnicodeText, nullable=False, default=u""), # obsolete
Column('active', Boolean, nullable=False, default=True),
# formats: question/answer/last(not used)
Column('qformat', UnicodeText, nullable=False),
Column('aformat', UnicodeText, nullable=False),
Column('lformat', UnicodeText),
# question/answer editor format (not used yet)
Column('qedformat', UnicodeText),
Column('aedformat', UnicodeText),
Column('questionInAnswer', Boolean, nullable=False, default=False),
# unused
Column('questionFontFamily', UnicodeText, default=u"Arial"),
Column('questionFontSize', Integer, default=20),
Column('questionFontColour', String(7), default=u"#000000"),
# used for both question & answer
Column('questionAlign', Integer, default=0),
# ununsed
Column('answerFontFamily', UnicodeText, default=u"Arial"),
Column('answerFontSize', Integer, default=20),
Column('answerFontColour', String(7), default=u"#000000"),
Column('answerAlign', Integer, default=0),
Column('lastFontFamily', UnicodeText, default=u"Arial"),
Column('lastFontSize', Integer, default=20),
# used as background colour
Column('lastFontColour', String(7), default=u"#FFFFFF"),
Column('editQuestionFontFamily', UnicodeText, default=None),
Column('editQuestionFontSize', Integer, default=None),
Column('editAnswerFontFamily', UnicodeText, default=None),
Column('editAnswerFontSize', Integer, default=None),
# empty answer
Column('allowEmptyAnswer', Boolean, nullable=False, default=True),
Column('typeAnswer', UnicodeText, nullable=False, default=u""))
class CardModel(object):
"""Represents how to generate the front and back of a card."""
def __init__(self, name=u"", qformat=u"q", aformat=u"a", active=True):
self.name = name
self.qformat = qformat
self.aformat = aformat
self.active = active
self.id = genID()
def copy(self):
new = CardModel()
for p in class_mapper(CardModel).iterate_properties:
setattr(new, p.key, getattr(self, p.key))
new.id = genID()
new.model = None
return new
mapper(CardModel, cardModelsTable)
def formatQA(cid, mid, fact, tags, cm, deck):
"Return a dict of {id, question, answer}"
d = {'id': cid}
fields = {}
for (k, v) in fact.items():
fields["text:"+k] = stripHTML(v[1])
if v[1]:
fields[k] = '<span class="fm%s">%s</span>' % (
hexifyID(v[0]), v[1])
else:
fields[k] = u""
fields['tags'] = tags[0]
fields['Tags'] = tags[0]
fields['modelTags'] = tags[1]
fields['cardModel'] = tags[2]
# render q & a
ret = []
for (type, format) in (("question", cm.qformat),
("answer", cm.aformat)):
# convert old style
format = re.sub("%\((.+?)\)s", "{{\\1}}", format)
# allow custom rendering functions & info
fields = runFilter("prepareFields", fields, cid, mid, fact, tags, cm, deck)
html = render(format, fields)
d[type] = runFilter("formatQA", html, type, cid, mid, fact, tags, cm, deck)
return d
# Model table
##########################################################################
modelsTable = Table(
'models', metadata,
Column('id', Integer, primary_key=True),
Column('deckId', Integer, ForeignKey("decks.id", use_alter=True, name="deckIdfk")),
Column('created', Float, nullable=False, default=time.time),
Column('modified', Float, nullable=False, default=time.time),
Column('tags', UnicodeText, nullable=False, default=u""),
Column('name', UnicodeText, nullable=False),
Column('description', UnicodeText, nullable=False, default=u""), # obsolete
Column('features', UnicodeText, nullable=False, default=u""), # used as mediaURL
Column('spacing', Float, nullable=False, default=0.1), # obsolete
Column('initialSpacing', Float, nullable=False, default=60), # obsolete
Column('source', Integer, nullable=False, default=0))
class Model(object):
"Defines the way a fact behaves, what fields it can contain, etc."
def __init__(self, name=u""):
self.name = name
self.id = genID()
def setModified(self):
self.modified = time.time()
def addFieldModel(self, field):
"Add a field model."
self.fieldModels.append(field)
s = object_session(self)
if s:
s.flush()
def addCardModel(self, card):
"Add a card model."
self.cardModels.append(card)
s = object_session(self)
if s:
s.flush()
mapper(Model, modelsTable, properties={
'fieldModels': relation(FieldModel, backref='model',
collection_class=ordering_list('ordinal'),
order_by=[fieldModelsTable.c.ordinal],
cascade="all, delete-orphan"),
'cardModels': relation(CardModel, backref='model',
collection_class=ordering_list('ordinal'),
order_by=[cardModelsTable.c.ordinal],
cascade="all, delete-orphan"),
})
# Model deletions
##########################################################################
modelsDeletedTable = Table(
'modelsDeleted', metadata,
Column('modelId', Integer, ForeignKey("models.id"),
nullable=False),
Column('deletedTime', Float, nullable=False))

364
oldanki/sound.py Normal file
View File

@ -0,0 +1,364 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Sound support
==============================
"""
__docformat__ = 'restructuredtext'
import re, sys, threading, time, subprocess, os, signal, errno, atexit
import tempfile, shutil
from oldanki.hooks import addHook, runHook
# Shared utils
##########################################################################
def playFromText(text):
for match in re.findall("\[sound:(.*?)\]", text):
play(match)
def stripSounds(text):
return re.sub("\[sound:.*?\]", "", text)
def hasSound(text):
return re.search("\[sound:.*?\]", text) is not None
##########################################################################
# the amount of noise to cancel
NOISE_AMOUNT = "0.1"
# the amount of amplification
NORM_AMOUNT = "-3"
# the amount of bass
BASS_AMOUNT = "+0"
# the amount to fade at end
FADE_AMOUNT = "0.25"
noiseProfile = ""
processingSrc = "rec.wav"
processingDst = "rec.mp3"
processingChain = []
recFiles = ["rec2.wav", "rec3.wav"]
cmd = ["sox", processingSrc, "rec2.wav"]
processingChain = [
None, # placeholder
["sox", "rec2.wav", "rec3.wav", "norm", NORM_AMOUNT,
"bass", BASS_AMOUNT, "fade", FADE_AMOUNT],
["lame", "rec3.wav", processingDst, "--noreplaygain", "--quiet"],
]
tmpdir = None
# don't show box on windows
if sys.platform == "win32":
si = subprocess.STARTUPINFO()
try:
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
except:
# python2.7+
si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
# tmp dir for non-hashed media
tmpdir = unicode(
tempfile.mkdtemp(prefix="oldanki"), sys.getfilesystemencoding())
else:
si = None
if sys.platform.startswith("darwin"):
# make sure lame, which is installed in /usr/local/bin, is in the path
os.environ['PATH'] += ":" + "/usr/local/bin"
dir = os.path.dirname(os.path.abspath(__file__))
dir = os.path.abspath(dir + "/../../../..")
os.environ['PATH'] += ":" + dir + "/audio"
def retryWait(proc):
# osx throws interrupted system call errors frequently
while 1:
try:
return proc.wait()
except OSError:
continue
# Noise profiles
##########################################################################
def checkForNoiseProfile():
global processingChain
if sys.platform.startswith("darwin"):
# not currently supported
processingChain = [
["lame", "rec.wav", "rec.mp3", "--noreplaygain", "--quiet"]]
else:
cmd = ["sox", processingSrc, "rec2.wav"]
if os.path.exists(noiseProfile):
cmd = cmd + ["noisered", noiseProfile, NOISE_AMOUNT]
processingChain[0] = cmd
def generateNoiseProfile():
try:
os.unlink(noiseProfile)
except OSError:
pass
retryWait(subprocess.Popen(
["sox", processingSrc, recFiles[0], "trim", "1.5", "1.5"],
startupinfo=si))
retryWait(subprocess.Popen(["sox", recFiles[0], recFiles[1],
"noiseprof", noiseProfile],
startupinfo=si))
processingChain[0] = ["sox", processingSrc, "rec2.wav",
"noisered", noiseProfile, NOISE_AMOUNT]
# Mplayer settings
##########################################################################
if sys.platform.startswith("win32"):
mplayerCmd = ["mplayer.exe", "-ao", "win32", "-really-quiet"]
dir = os.path.dirname(os.path.abspath(sys.argv[0]))
os.environ['PATH'] += ";" + dir
os.environ['PATH'] += ";" + dir + "\\..\\win\\top" # for testing
else:
mplayerCmd = ["mplayer", "-really-quiet"]
# Mplayer in slave mode
##########################################################################
mplayerQueue = []
mplayerManager = None
mplayerReader = None
mplayerEvt = threading.Event()
mplayerClear = False
class MplayerReader(threading.Thread):
"Read any debugging info to prevent mplayer from blocking."
def run(self):
while 1:
mplayerEvt.wait()
try:
mplayerManager.mplayer.stdout.read()
except:
pass
class MplayerMonitor(threading.Thread):
def run(self):
global mplayerClear
self.mplayer = None
self.deadPlayers = []
while 1:
mplayerEvt.wait()
if mplayerQueue:
# ensure started
if not self.mplayer:
self.startProcess()
# loop through files to play
while mplayerQueue:
item = mplayerQueue.pop(0)
if mplayerClear:
mplayerClear = False
extra = ""
else:
extra = " 1"
cmd = 'loadfile "%s"%s\n' % (item, extra)
try:
self.mplayer.stdin.write(cmd)
except:
# mplayer has quit and needs restarting
self.deadPlayers.append(self.mplayer)
self.mplayer = None
self.startProcess()
self.mplayer.stdin.write(cmd)
# wait() on finished processes. we don't want to block on the
# wait, so we keep trying each time we're reactivated
def clean(pl):
if pl.poll() is not None:
pl.wait()
return False
else:
return True
self.deadPlayers = [pl for pl in self.deadPlayers if clean(pl)]
mplayerEvt.clear()
def kill(self):
if not self.mplayer:
return
try:
self.mplayer.stdin.write("quit\n")
self.deadPlayers.append(self.mplayer)
except:
pass
self.mplayer = None
def startProcess(self):
try:
cmd = mplayerCmd + ["-slave", "-idle"]
self.mplayer = subprocess.Popen(
cmd, startupinfo=si, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
mplayerEvt.clear()
raise Exception("Audio player not found")
def queueMplayer(path):
ensureMplayerThreads()
while mplayerEvt.isSet():
time.sleep(0.1)
if tmpdir and os.path.exists(path):
# mplayer on windows doesn't like the encoding, so we create a
# temporary file instead. oddly, foreign characters in the dirname
# don't seem to matter.
(fd, name) = tempfile.mkstemp(suffix=os.path.splitext(path)[1],
dir=tmpdir)
f = os.fdopen(fd, "wb")
f.write(open(path, "rb").read())
f.close()
# it wants unix paths, too!
path = name.replace("\\", "/")
path = path.encode(sys.getfilesystemencoding())
else:
path = path.encode("utf-8")
mplayerQueue.append(path)
mplayerEvt.set()
runHook("soundQueued")
def clearMplayerQueue():
global mplayerClear
mplayerClear = True
mplayerEvt.set()
def ensureMplayerThreads():
global mplayerManager, mplayerReader
if not mplayerManager:
mplayerManager = MplayerMonitor()
mplayerManager.daemon = True
mplayerManager.start()
mplayerReader = MplayerReader()
mplayerReader.daemon = True
mplayerReader.start()
def stopMplayer():
if not mplayerManager:
return
mplayerManager.kill()
def onExit():
if tmpdir:
shutil.rmtree(tmpdir)
addHook("deckClosed", stopMplayer)
atexit.register(onExit)
# PyAudio recording
##########################################################################
try:
import pyaudio
import wave
PYAU_FORMAT = pyaudio.paInt16
PYAU_CHANNELS = 1
PYAU_RATE = 44100
PYAU_INPUT_INDEX = None
except:
pass
class _Recorder(object):
def postprocess(self, encode=True):
self.encode = encode
for c in processingChain:
#print c
if not self.encode and c[0] == 'lame':
continue
ret = retryWait(subprocess.Popen(c, startupinfo=si))
if ret:
raise Exception(_("""
Error processing audio.
If you're on Linux and don't have sox 14.1+, you
need to disable normalization. See the wiki.
Command was:\n""") + u" ".join(c))
class PyAudioThreadedRecorder(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.finish = False
def run(self):
chunk = 1024
try:
p = pyaudio.PyAudio()
except NameError:
raise Exception(
"Pyaudio not installed (recording not supported on OSX10.3)")
stream = p.open(format=PYAU_FORMAT,
channels=PYAU_CHANNELS,
rate=PYAU_RATE,
input=True,
input_device_index=PYAU_INPUT_INDEX,
frames_per_buffer=chunk)
all = []
while not self.finish:
try:
data = stream.read(chunk)
except IOError, e:
if e[1] == pyaudio.paInputOverflowed:
data = None
else:
raise
if data:
all.append(data)
stream.close()
p.terminate()
data = ''.join(all)
wf = wave.open(processingSrc, 'wb')
wf.setnchannels(PYAU_CHANNELS)
wf.setsampwidth(p.get_sample_size(PYAU_FORMAT))
wf.setframerate(PYAU_RATE)
wf.writeframes(data)
wf.close()
class PyAudioRecorder(_Recorder):
def __init__(self):
for t in recFiles + [processingSrc, processingDst]:
try:
os.unlink(t)
except OSError:
pass
self.encode = False
def start(self):
self.thread = PyAudioThreadedRecorder()
self.thread.start()
def stop(self):
self.thread.finish = True
self.thread.join()
def file(self):
if self.encode:
tgt = "rec%d.mp3" % time.time()
os.rename(processingDst, tgt)
return tgt
else:
return recFiles[1]
# Audio interface
##########################################################################
_player = queueMplayer
_queueEraser = clearMplayerQueue
def play(path):
_player(path)
def clearAudioQueue():
_queueEraser()
Recorder = PyAudioRecorder

560
oldanki/stats.py Normal file
View File

@ -0,0 +1,560 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Statistical tracking and reports
=================================
"""
__docformat__ = 'restructuredtext'
# we track statistics over the life of the deck, and per-day
STATS_LIFE = 0
STATS_DAY = 1
import unicodedata, time, sys, os, datetime
import oldanki, oldanki.utils
from datetime import date
from oldanki.db import *
from oldanki.lang import _, ngettext
from oldanki.utils import canonifyTags, ids2str
from oldanki.hooks import runFilter
# Tracking stats on the DB
##########################################################################
statsTable = Table(
'stats', metadata,
Column('id', Integer, primary_key=True),
Column('type', Integer, nullable=False),
Column('day', Date, nullable=False),
Column('reps', Integer, nullable=False, default=0),
Column('averageTime', Float, nullable=False, default=0),
Column('reviewTime', Float, nullable=False, default=0),
# next two columns no longer used
Column('distractedTime', Float, nullable=False, default=0),
Column('distractedReps', Integer, nullable=False, default=0),
Column('newEase0', Integer, nullable=False, default=0),
Column('newEase1', Integer, nullable=False, default=0),
Column('newEase2', Integer, nullable=False, default=0),
Column('newEase3', Integer, nullable=False, default=0),
Column('newEase4', Integer, nullable=False, default=0),
Column('youngEase0', Integer, nullable=False, default=0),
Column('youngEase1', Integer, nullable=False, default=0),
Column('youngEase2', Integer, nullable=False, default=0),
Column('youngEase3', Integer, nullable=False, default=0),
Column('youngEase4', Integer, nullable=False, default=0),
Column('matureEase0', Integer, nullable=False, default=0),
Column('matureEase1', Integer, nullable=False, default=0),
Column('matureEase2', Integer, nullable=False, default=0),
Column('matureEase3', Integer, nullable=False, default=0),
Column('matureEase4', Integer, nullable=False, default=0))
class Stats(object):
def __init__(self):
self.day = None
self.reps = 0
self.averageTime = 0
self.reviewTime = 0
self.distractedTime = 0
self.distractedReps = 0
self.newEase0 = 0
self.newEase1 = 0
self.newEase2 = 0
self.newEase3 = 0
self.newEase4 = 0
self.youngEase0 = 0
self.youngEase1 = 0
self.youngEase2 = 0
self.youngEase3 = 0
self.youngEase4 = 0
self.matureEase0 = 0
self.matureEase1 = 0
self.matureEase2 = 0
self.matureEase3 = 0
self.matureEase4 = 0
def fromDB(self, s, id):
r = s.first("select * from stats where id = :id", id=id)
(self.id,
self.type,
self.day,
self.reps,
self.averageTime,
self.reviewTime,
self.distractedTime,
self.distractedReps,
self.newEase0,
self.newEase1,
self.newEase2,
self.newEase3,
self.newEase4,
self.youngEase0,
self.youngEase1,
self.youngEase2,
self.youngEase3,
self.youngEase4,
self.matureEase0,
self.matureEase1,
self.matureEase2,
self.matureEase3,
self.matureEase4) = r
self.day = datetime.date(*[int(i) for i in self.day.split("-")])
def create(self, s, type, day):
self.type = type
self.day = day
s.execute("""insert into stats
(type, day, reps, averageTime, reviewTime, distractedTime, distractedReps,
newEase0, newEase1, newEase2, newEase3, newEase4, youngEase0, youngEase1,
youngEase2, youngEase3, youngEase4, matureEase0, matureEase1, matureEase2,
matureEase3, matureEase4) values (:type, :day, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)""", self.__dict__)
self.id = s.scalar(
"select id from stats where type = :type and day = :day",
type=type, day=day)
def toDB(self, s):
assert self.id
s.execute("""update stats set
type=:type,
day=:day,
reps=:reps,
averageTime=:averageTime,
reviewTime=:reviewTime,
newEase0=:newEase0,
newEase1=:newEase1,
newEase2=:newEase2,
newEase3=:newEase3,
newEase4=:newEase4,
youngEase0=:youngEase0,
youngEase1=:youngEase1,
youngEase2=:youngEase2,
youngEase3=:youngEase3,
youngEase4=:youngEase4,
matureEase0=:matureEase0,
matureEase1=:matureEase1,
matureEase2=:matureEase2,
matureEase3=:matureEase3,
matureEase4=:matureEase4
where id = :id""", self.__dict__)
mapper(Stats, statsTable)
def genToday(deck):
return datetime.datetime.utcfromtimestamp(
time.time() - deck.utcOffset).date()
def updateAllStats(s, gs, ds, card, ease, oldState):
"Update global and daily statistics."
updateStats(s, gs, card, ease, oldState)
updateStats(s, ds, card, ease, oldState)
def updateStats(s, stats, card, ease, oldState):
stats.reps += 1
delay = card.totalTime()
if delay >= 60:
stats.reviewTime += 60
else:
stats.reviewTime += delay
stats.averageTime = (
stats.reviewTime / float(stats.reps))
# update eases
attr = oldState + "Ease%d" % ease
setattr(stats, attr, getattr(stats, attr) + 1)
stats.toDB(s)
def globalStats(deck):
s = deck.s
type = STATS_LIFE
today = genToday(deck)
id = s.scalar("select id from stats where type = :type",
type=type)
stats = Stats()
if id:
stats.fromDB(s, id)
return stats
else:
stats.create(s, type, today)
stats.type = type
return stats
def dailyStats(deck):
s = deck.s
type = STATS_DAY
today = genToday(deck)
id = s.scalar("select id from stats where type = :type and day = :day",
type=type, day=today)
stats = Stats()
if id:
stats.fromDB(s, id)
return stats
else:
stats.create(s, type, today)
return stats
def summarizeStats(stats, pre=""):
"Generate percentages and total counts for STATS. Optionally prefix."
cardTypes = ("new", "young", "mature")
h = {}
# total counts
###############
for type in cardTypes:
# total yes/no for type, eg. gNewYes
h[pre + type.capitalize() + "No"] = (getattr(stats, type + "Ease0") +
getattr(stats, type + "Ease1"))
h[pre + type.capitalize() + "Yes"] = (getattr(stats, type + "Ease2") +
getattr(stats, type + "Ease3") +
getattr(stats, type + "Ease4"))
# total for type, eg. gNewTotal
h[pre + type.capitalize() + "Total"] = (
h[pre + type.capitalize() + "No"] +
h[pre + type.capitalize() + "Yes"])
# total yes/no, eg. gYesTotal
for answer in ("yes", "no"):
num = 0
for type in cardTypes:
num += h[pre + type.capitalize() + answer.capitalize()]
h[pre + answer.capitalize() + "Total"] = num
# total over all, eg. gTotal
num = 0
for type in cardTypes:
num += h[pre + type.capitalize() + "Total"]
h[pre + "Total"] = num
# percentages
##############
for type in cardTypes:
# total yes/no % by type, eg. gNewYes%
for answer in ("yes", "no"):
setPercentage(h, pre + type.capitalize() + answer.capitalize(),
pre + type.capitalize())
for answer in ("yes", "no"):
# total yes/no, eg. gYesTotal%
setPercentage(h, pre + answer.capitalize() + "Total", pre)
h[pre + 'AverageTime'] = stats.averageTime
h[pre + 'ReviewTime'] = stats.reviewTime
return h
def setPercentage(h, a, b):
try:
h[a + "%"] = (h[a] / float(h[b + "Total"])) * 100
except ZeroDivisionError:
h[a + "%"] = 0
def getStats(s, gs, ds):
"Return a handy dictionary exposing a number of internal stats."
h = {}
h.update(summarizeStats(gs, "g"))
h.update(summarizeStats(ds, "d"))
return h
# Card stats
##########################################################################
class CardStats(object):
def __init__(self, deck, card):
self.deck = deck
self.card = card
def report(self):
c = self.card
fmt = oldanki.utils.fmtTimeSpan
fmtFloat = oldanki.utils.fmtFloat
self.txt = "<table>"
self.addLine(_("Added"), self.strTime(c.created))
if c.firstAnswered:
self.addLine(_("First Review"), self.strTime(c.firstAnswered))
self.addLine(_("Changed"), self.strTime(c.modified))
if c.reps:
next = time.time() - c.combinedDue
if next > 0:
next = _("%s ago") % fmt(next)
else:
next = _("in %s") % fmt(abs(next))
self.addLine(_("Due"), next)
self.addLine(_("Interval"), fmt(c.interval * 86400))
self.addLine(_("Ease"), fmtFloat(c.factor, point=2))
if c.lastDue:
last = _("%s ago") % fmt(time.time() - c.lastDue)
self.addLine(_("Last Due"), last)
if c.interval != c.lastInterval:
# don't show the last interval if it hasn't been updated yet
self.addLine(_("Last Interval"), fmt(c.lastInterval * 86400))
self.addLine(_("Last Ease"), fmtFloat(c.lastFactor, point=2))
if c.reps:
self.addLine(_("Reviews"), "%d/%d (s=%d)" % (
c.yesCount, c.reps, c.successive))
avg = fmt(c.averageTime, point=2)
self.addLine(_("Average Time"),avg)
total = fmt(c.reviewTime, point=2)
self.addLine(_("Total Time"), total)
self.addLine(_("Model Tags"), c.fact.model.tags)
self.addLine(_("Card Template") + "&nbsp;"*5, c.cardModel.name)
self.txt += "</table>"
return self.txt
def addLine(self, k, v):
self.txt += "<tr><td><b>%s<b></td><td>%s</td></tr>" % (k, v)
def strTime(self, tm):
s = oldanki.utils.fmtTimeSpan(time.time() - tm)
return _("%s ago") % s
# Deck stats (specific to the 'sched' scheduler)
##########################################################################
class DeckStats(object):
def __init__(self, deck):
self.deck = deck
def report(self):
"Return an HTML string with a report."
fmtPerc = oldanki.utils.fmtPercentage
fmtFloat = oldanki.utils.fmtFloat
if self.deck.isEmpty():
return _("Please add some cards first.") + "<p/>"
d = self.deck
html="<h1>" + _("Deck Statistics") + "</h1>"
html += _("Deck created: <b>%s</b> ago<br>") % self.createdTimeStr()
total = d.cardCount
new = d.newCountAll()
young = d.youngCardCount()
old = d.matureCardCount()
newP = new / float(total) * 100
youngP = young / float(total) * 100
oldP = old / float(total) * 100
stats = d.getStats()
(stats["new"], stats["newP"]) = (new, newP)
(stats["old"], stats["oldP"]) = (old, oldP)
(stats["young"], stats["youngP"]) = (young, youngP)
html += _("Total number of cards:") + " <b>%d</b><br>" % total
html += _("Total number of facts:") + " <b>%d</b><br><br>" % d.factCount
html += "<b>" + _("Card Maturity") + "</b><br>"
html += _("Mature cards: <!--card count-->") + " <b>%(old)d</b> (%(oldP)s)<br>" % {
'old': stats['old'], 'oldP' : fmtPerc(stats['oldP'])}
html += _("Young cards: <!--card count-->") + " <b>%(young)d</b> (%(youngP)s)<br>" % {
'young': stats['young'], 'youngP' : fmtPerc(stats['youngP'])}
html += _("Unseen cards:") + " <b>%(new)d</b> (%(newP)s)<br>" % {
'new': stats['new'], 'newP' : fmtPerc(stats['newP'])}
avgInt = self.getAverageInterval()
if avgInt:
html += _("Average interval: ") + ("<b>%s</b> ") % fmtFloat(avgInt) + _("days")
html += "<br>"
html += "<br>"
html += "<b>" + _("Correct Answers") + "</b><br>"
html += _("Mature cards: <!--correct answers-->") + " <b>" + fmtPerc(stats['gMatureYes%']) + (
"</b> " + _("(%(partOf)d of %(totalSum)d)") % {
'partOf' : stats['gMatureYes'],
'totalSum' : stats['gMatureTotal'] } + "<br>")
html += _("Young cards: <!--correct answers-->") + " <b>" + fmtPerc(stats['gYoungYes%']) + (
"</b> " + _("(%(partOf)d of %(totalSum)d)") % {
'partOf' : stats['gYoungYes'],
'totalSum' : stats['gYoungTotal'] } + "<br>")
html += _("First-seen cards:") + " <b>" + fmtPerc(stats['gNewYes%']) + (
"</b> " + _("(%(partOf)d of %(totalSum)d)") % {
'partOf' : stats['gNewYes'],
'totalSum' : stats['gNewTotal'] } + "<br><br>")
# average pending time
existing = d.cardCount - d.newCountToday
def tr(a, b):
return "<tr><td>%s</td><td align=right>%s</td></tr>" % (a, b)
def repsPerDay(reps,days):
retval = ("<b>%d</b> " % reps) + ngettext("rep", "reps", reps)
retval += ("/<b>%d</b> " % days) + ngettext("day", "days", days)
return retval
if existing and avgInt:
html += "<b>" + _("Recent Work") + "</b>"
if sys.platform.startswith("darwin"):
html += "<table width=250>"
else:
html += "<table width=200>"
html += tr(_("In last week"), repsPerDay(
self.getRepsDone(-7, 0),
self.getDaysReviewed(-7, 0)))
html += tr(_("In last month"), repsPerDay(
self.getRepsDone(-30, 0),
self.getDaysReviewed(-30, 0)))
html += tr(_("In last 3 months"), repsPerDay(
self.getRepsDone(-92, 0),
self.getDaysReviewed(-92, 0)))
html += tr(_("In last 6 months"), repsPerDay(
self.getRepsDone(-182, 0),
self.getDaysReviewed(-182, 0)))
html += tr(_("In last year"), repsPerDay(
self.getRepsDone(-365, 0),
self.getDaysReviewed(-365, 0)))
html += tr(_("Deck life"), repsPerDay(
self.getRepsDone(-13000, 0),
self.getDaysReviewed(-13000, 0)))
html += "</table>"
html += "<br><br><b>" + _("Average Daily Reviews") + "</b>"
if sys.platform.startswith("darwin"):
html += "<table width=250>"
else:
html += "<table width=200>"
html += tr(_("Deck life"), ("<b>%s</b> ") % (
fmtFloat(self.getSumInverseRoundInterval())) + _("cards/day"))
html += tr(_("In next week"), ("<b>%s</b> ") % (
fmtFloat(self.getWorkloadPeriod(7))) + _("cards/day"))
html += tr(_("In next month"), ("<b>%s</b> ") % (
fmtFloat(self.getWorkloadPeriod(30))) + _("cards/day"))
html += tr(_("In last week"), ("<b>%s</b> ") % (
fmtFloat(self.getPastWorkloadPeriod(7))) + _("cards/day"))
html += tr(_("In last month"), ("<b>%s</b> ") % (
fmtFloat(self.getPastWorkloadPeriod(30))) + _("cards/day"))
html += tr(_("In last 3 months"), ("<b>%s</b> ") % (
fmtFloat(self.getPastWorkloadPeriod(92))) + _("cards/day"))
html += tr(_("In last 6 months"), ("<b>%s</b> ") % (
fmtFloat(self.getPastWorkloadPeriod(182))) + _("cards/day"))
html += tr(_("In last year"), ("<b>%s</b> ") % (
fmtFloat(self.getPastWorkloadPeriod(365))) + _("cards/day"))
html += "</table>"
html += "<br><br><b>" + _("Average Added") + "</b>"
if sys.platform.startswith("darwin"):
html += "<table width=250>"
else:
html += "<table width=200>"
html += tr(_("Deck life"), _("<b>%(a)s</b>/day, <b>%(b)s</b>/mon") % {
'a': fmtFloat(self.newAverage()), 'b': fmtFloat(self.newAverage()*30)})
np = self.getNewPeriod(7)
html += tr(_("In last week"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(7))}))
np = self.getNewPeriod(30)
html += tr(_("In last month"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(30))}))
np = self.getNewPeriod(92)
html += tr(_("In last 3 months"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(92))}))
np = self.getNewPeriod(182)
html += tr(_("In last 6 months"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(182))}))
np = self.getNewPeriod(365)
html += tr(_("In last year"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(365))}))
html += "</table>"
html += "<br><br><b>" + _("Average New Seen") + "</b>"
if sys.platform.startswith("darwin"):
html += "<table width=250>"
else:
html += "<table width=200>"
np = self.getFirstPeriod(7)
html += tr(_("In last week"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(7))}))
np = self.getFirstPeriod(30)
html += tr(_("In last month"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(30))}))
np = self.getFirstPeriod(92)
html += tr(_("In last 3 months"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(92))}))
np = self.getFirstPeriod(182)
html += tr(_("In last 6 months"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(182))}))
np = self.getFirstPeriod(365)
html += tr(_("In last year"), _("<b>%(a)d</b> (<b>%(b)s</b>/day)") % (
{'a': np, 'b': fmtFloat(np / float(365))}))
html += "</table>"
html += "<br><br><b>" + _("Card Ease") + "</b><br>"
html += _("Lowest factor: %.2f") % d.s.scalar(
"select min(factor) from cards") + "<br>"
html += _("Average factor: %.2f") % d.s.scalar(
"select avg(factor) from cards") + "<br>"
html += _("Highest factor: %.2f") % d.s.scalar(
"select max(factor) from cards") + "<br>"
html = runFilter("deckStats", html)
return html
def getDaysReviewed(self, start, finish):
now = datetime.datetime.today()
x = now + datetime.timedelta(start)
y = now + datetime.timedelta(finish)
return self.deck.s.scalar(
"select count() from stats where "
"day >= :x and day <= :y and reps > 0",
x=x, y=y)
def getRepsDone(self, start, finish):
now = datetime.datetime.today()
x = time.mktime((now + datetime.timedelta(start)).timetuple())
y = time.mktime((now + datetime.timedelta(finish)).timetuple())
return self.deck.s.scalar(
"select count() from reviewHistory where time >= :x and time <= :y",
x=x, y=y)
def getAverageInterval(self):
return self.deck.s.scalar(
"select sum(interval) / count(interval) from cards "
"where cards.reps > 0") or 0
def intervalReport(self, intervals, labels, total):
boxes = self.splitIntoIntervals(intervals)
keys = boxes.keys()
keys.sort()
html = ""
for key in keys:
html += ("<tr><td align=right>%s</td><td align=right>" +
"%d</td><td align=right>%s</td></tr>") % (
labels[key],
boxes[key],
fmtPerc(boxes[key] / float(total) * 100))
return html
def splitIntoIntervals(self, intervals):
boxes = {}
n = 0
for i in range(len(intervals) - 1):
(min, max) = (intervals[i], intervals[i+1])
for c in self.deck:
if c.interval > min and c.interval <= max:
boxes[n] = boxes.get(n, 0) + 1
n += 1
return boxes
def newAverage(self):
"Average number of new cards added each day."
return self.deck.cardCount / max(1, self.ageInDays())
def createdTimeStr(self):
return oldanki.utils.fmtTimeSpan(time.time() - self.deck.created)
def ageInDays(self):
return (time.time() - self.deck.created) / 86400.0
def getSumInverseRoundInterval(self):
return self.deck.s.scalar(
"select sum(1/round(max(interval, 1)+0.5)) from cards "
"where cards.reps > 0 "
"and priority > 0") or 0
def getWorkloadPeriod(self, period):
cutoff = time.time() + 86400 * period
return (self.deck.s.scalar("""
select count(id) from cards
where combinedDue < :cutoff
and priority > 0 and relativeDelay in (0,1)""", cutoff=cutoff) or 0) / float(period)
def getPastWorkloadPeriod(self, period):
cutoff = time.time() - 86400 * period
return (self.deck.s.scalar("""
select count(*) from reviewHistory
where time > :cutoff""", cutoff=cutoff) or 0) / float(period)
def getNewPeriod(self, period):
cutoff = time.time() - 86400 * period
return (self.deck.s.scalar("""
select count(id) from cards
where created > :cutoff""", cutoff=cutoff) or 0)
def getFirstPeriod(self, period):
cutoff = time.time() - 86400 * period
return (self.deck.s.scalar("""
select count(*) from reviewHistory
where reps = 1 and time > :cutoff""", cutoff=cutoff) or 0)

51
oldanki/stdmodels.py Normal file
View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Standard Models.
==============================================================
Plugins can add to the 'models' dict to provide more standard
models.
"""
from oldanki.models import Model, CardModel, FieldModel
from oldanki.lang import _
models = {}
def byName(name):
fn = models.get(name)
if fn:
return fn()
raise ValueError("No such model available!")
def names():
return models.keys()
# Basic
##########################################################################
def BasicModel():
m = Model(_('Basic'))
m.addFieldModel(FieldModel(u'Front', True, True))
m.addFieldModel(FieldModel(u'Back', False, False))
m.addCardModel(CardModel(u'Forward', u'%(Front)s', u'%(Back)s'))
m.addCardModel(CardModel(u'Reverse', u'%(Back)s', u'%(Front)s',
active=False))
m.tags = u"Basic"
return m
models['Basic'] = BasicModel
# Recovery
##########################################################################
def RecoveryModel():
m = Model(_('Recovery'))
m.addFieldModel(FieldModel(u'Question', False, False))
m.addFieldModel(FieldModel(u'Answer', False, False))
m.addCardModel(CardModel(u'Single', u'{{{Question}}}', u'{{{Answer}}}'))
m.tags = u"Recovery"
return m

1236
oldanki/sync.py Normal file

File diff suppressed because it is too large Load Diff

59
oldanki/tags.py Normal file
View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Tags
====================
"""
__docformat__ = 'restructuredtext'
from oldanki.db import *
#src 0 = fact
#src 1 = model
#src 2 = card model
# Tables
##########################################################################
def initTagTables(s):
try:
s.statement("""
create table tags (
id integer not null,
tag text not null collate nocase,
priority integer not null default 2,
primary key(id))""")
s.statement("""
create table cardTags (
id integer not null,
cardId integer not null,
tagId integer not null,
src integer not null,
primary key(id))""")
except:
pass
def tagId(s, tag, create=True):
"Return ID for tag, creating if necessary."
id = s.scalar("select id from tags where tag = :tag", tag=tag)
if id or not create:
return id
s.statement("""
insert or ignore into tags
(tag) values (:tag)""", tag=tag)
return s.scalar("select id from tags where tag = :tag", tag=tag)
def tagIds(s, tags, create=True):
"Return an ID for all tags, creating if necessary."
ids = {}
if create:
s.statements("insert or ignore into tags (tag) values (:tag)",
[{'tag': t} for t in tags])
tagsD = dict([(x.lower(), y) for (x, y) in s.all("""
select tag, id from tags
where tag in (%s)""" % ",".join([
"'%s'" % t.replace("'", "''") for t in tags]))])
return tagsD

20
oldanki/template/LICENSE Normal file
View File

@ -0,0 +1,20 @@
Copyright (c) 2009 Chris Wanstrath
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,6 @@
Anki uses a modified version of Pystache to provide Mustache-like syntax.
Behaviour is a little different from standard Mustache:
- {{text}} returns text verbatim with no HTML escaping
- {{{text}}} strips an outer span tag
- partial rendering is disabled for security reasons

View File

@ -0,0 +1,78 @@
========
Pystache
========
Inspired by ctemplate_ and et_, Mustache_ is a
framework-agnostic way to render logic-free views.
As ctemplates says, "It emphasizes separating logic from presentation:
it is impossible to embed application logic in this template language."
Pystache is a Python implementation of Mustache. Pystache requires
Python 2.6.
Documentation
=============
The different Mustache tags are documented at `mustache(5)`_.
Install It
==========
::
pip install pystache
Use It
======
::
>>> import pystache
>>> pystache.render('Hi {{person}}!', {'person': 'Mom'})
'Hi Mom!'
You can also create dedicated view classes to hold your view logic.
Here's your simple.py::
import pystache
class Simple(pystache.View):
def thing(self):
return "pizza"
Then your template, simple.mustache::
Hi {{thing}}!
Pull it together::
>>> Simple().render()
'Hi pizza!'
Test It
=======
nose_ works great! ::
pip install nose
cd pystache
nosetests
Author
======
::
context = { 'author': 'Chris Wanstrath', 'email': 'chris@ozmm.org' }
pystache.render("{{author}} :: {{email}}", context)
.. _ctemplate: http://code.google.com/p/google-ctemplate/
.. _et: http://www.ivan.fomichev.name/2008/05/erlang-template-engine-prototype.html
.. _Mustache: http://defunkt.github.com/mustache/
.. _mustache(5): http://defunkt.github.com/mustache/mustache.5.html
.. _nose: http://somethingaboutorange.com/mrl/projects/nose/0.11.1/testing.html

View File

@ -0,0 +1,7 @@
from oldanki.template.template import Template
from oldanki.template.view import View
def render(template, context=None, **kwargs):
context = context and context.copy() or {}
context.update(kwargs)
return Template(template, context).render()

View File

@ -0,0 +1,156 @@
import re
import cgi
import collections
modifiers = {}
def modifier(symbol):
"""Decorator for associating a function with a Mustache tag modifier.
@modifier('P')
def render_tongue(self, tag_name=None, context=None):
return ":P %s" % tag_name
{{P yo }} => :P yo
"""
def set_modifier(func):
modifiers[symbol] = func
return func
return set_modifier
def get_or_attr(obj, name, default=None):
try:
return obj[name]
except KeyError:
return default
except:
try:
return getattr(obj, name)
except AttributeError:
return default
class Template(object):
# The regular expression used to find a #section
section_re = None
# The regular expression used to find a tag.
tag_re = None
# Opening tag delimiter
otag = '{{'
# Closing tag delimiter
ctag = '}}'
def __init__(self, template, context=None):
self.template = template
self.context = context or {}
self.compile_regexps()
def render(self, template=None, context=None, encoding=None):
"""Turns a Mustache template into something wonderful."""
template = template or self.template
context = context or self.context
template = self.render_sections(template, context)
result = self.render_tags(template, context)
if encoding is not None:
result = result.encode(encoding)
return result
def compile_regexps(self):
"""Compiles our section and tag regular expressions."""
tags = { 'otag': re.escape(self.otag), 'ctag': re.escape(self.ctag) }
section = r"%(otag)s[\#|^]([^\}]*)%(ctag)s(.+?)%(otag)s/\1%(ctag)s"
self.section_re = re.compile(section % tags, re.M|re.S)
tag = r"%(otag)s(#|=|&|!|>|\{)?(.+?)\1?%(ctag)s+"
self.tag_re = re.compile(tag % tags)
def render_sections(self, template, context):
"""Expands sections."""
while 1:
match = self.section_re.search(template)
if match is None:
break
section, section_name, inner = match.group(0, 1, 2)
section_name = section_name.strip()
it = get_or_attr(context, section_name, None)
replacer = ''
# if it and isinstance(it, collections.Callable):
# replacer = it(inner)
if it and not hasattr(it, '__iter__'):
if section[2] != '^':
replacer = inner
elif it and hasattr(it, 'keys') and hasattr(it, '__getitem__'):
if section[2] != '^':
replacer = self.render(inner, it)
elif it:
insides = []
for item in it:
insides.append(self.render(inner, item))
replacer = ''.join(insides)
elif not it and section[2] == '^':
replacer = inner
template = template.replace(section, replacer)
return template
def render_tags(self, template, context):
"""Renders all the tags in a template for a context."""
while 1:
match = self.tag_re.search(template)
if match is None:
break
tag, tag_type, tag_name = match.group(0, 1, 2)
tag_name = tag_name.strip()
try:
func = modifiers[tag_type]
replacement = func(self, tag_name, context)
template = template.replace(tag, replacement)
except:
return u"{{invalid template}}"
return template
@modifier('{')
def render_tag(self, tag_name, context):
"""Given a tag name and context, finds, escapes, and renders the tag."""
raw = get_or_attr(context, tag_name, '')
if not raw and raw is not 0:
return ''
return re.sub("^<span.+?>(.*)</span>", "\\1", raw)
@modifier('!')
def render_comment(self, tag_name=None, context=None):
"""Rendering a comment always returns nothing."""
return ''
@modifier(None)
def render_unescaped(self, tag_name=None, context=None):
"""Render a tag without escaping it."""
return unicode(get_or_attr(context, tag_name, '{unknown field %s}' % tag_name))
# @modifier('>')
# def render_partial(self, tag_name=None, context=None):
# """Renders a partial within the current context."""
# # Import view here to avoid import loop
# from pystache.view import View
# view = View(context=context)
# view.template_name = tag_name
# return view.render()
@modifier('=')
def render_delimiter(self, tag_name=None, context=None):
"""Changes the Mustache delimiter."""
self.otag, self.ctag = tag_name.split(' ')
self.compile_regexps()
return ''

116
oldanki/template/view.py Normal file
View File

@ -0,0 +1,116 @@
from oldanki.template import Template
import os.path
import re
class View(object):
# Path where this view's template(s) live
template_path = '.'
# Extension for templates
template_extension = 'mustache'
# The name of this template. If none is given the View will try
# to infer it based on the class name.
template_name = None
# Absolute path to the template itself. Pystache will try to guess
# if it's not provided.
template_file = None
# Contents of the template.
template = None
# Character encoding of the template file. If None, Pystache will not
# do any decoding of the template.
template_encoding = None
def __init__(self, template=None, context=None, **kwargs):
self.template = template
self.context = context or {}
# If the context we're handed is a View, we want to inherit
# its settings.
if isinstance(context, View):
self.inherit_settings(context)
if kwargs:
self.context.update(kwargs)
def inherit_settings(self, view):
"""Given another View, copies its settings."""
if view.template_path:
self.template_path = view.template_path
if view.template_name:
self.template_name = view.template_name
def load_template(self):
if self.template:
return self.template
if self.template_file:
return self._load_template()
name = self.get_template_name() + '.' + self.template_extension
if isinstance(self.template_path, basestring):
self.template_file = os.path.join(self.template_path, name)
return self._load_template()
for path in self.template_path:
self.template_file = os.path.join(path, name)
if os.path.exists(self.template_file):
return self._load_template()
raise IOError('"%s" not found in "%s"' % (name, ':'.join(self.template_path),))
def _load_template(self):
f = open(self.template_file, 'r')
try:
template = f.read()
if self.template_encoding:
template = unicode(template, self.template_encoding)
finally:
f.close()
return template
def get_template_name(self, name=None):
"""TemplatePartial => template_partial
Takes a string but defaults to using the current class' name or
the `template_name` attribute
"""
if self.template_name:
return self.template_name
if not name:
name = self.__class__.__name__
def repl(match):
return '_' + match.group(0).lower()
return re.sub('[A-Z]', repl, name)[1:]
def __contains__(self, needle):
return needle in self.context or hasattr(self, needle)
def __getitem__(self, attr):
val = self.get(attr, None)
if not val:
raise KeyError("No such key.")
return val
def get(self, attr, default):
attr = self.context.get(attr, getattr(self, attr, default))
if hasattr(attr, '__call__'):
return attr()
else:
return attr
def render(self, encoding=None):
template = self.load_template()
return Template(template, self).render(encoding=encoding)
def __str__(self):
return self.render()

297
oldanki/utils.py Normal file
View File

@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <oldanki@ichi2.net>
# License: GNU GPL, version 3 or later; http://www.gnu.org/copyleft/gpl.html
"""\
Miscellaneous utilities
==============================
"""
__docformat__ = 'restructuredtext'
import re, os, random, time, types, math, htmlentitydefs, subprocess
try:
import hashlib
md5 = hashlib.md5
except ImportError:
import md5
md5 = md5.new
from oldanki.db import *
from oldanki.lang import _, ngettext
import locale, sys
if sys.version_info[1] < 5:
def format_string(a, b):
return a % b
locale.format_string = format_string
# Time handling
##############################################################################
timeTable = {
"years": lambda n: ngettext("%s year", "%s years", n),
"months": lambda n: ngettext("%s month", "%s months", n),
"days": lambda n: ngettext("%s day", "%s days", n),
"hours": lambda n: ngettext("%s hour", "%s hours", n),
"minutes": lambda n: ngettext("%s minute", "%s minutes", n),
"seconds": lambda n: ngettext("%s second", "%s seconds", n),
}
afterTimeTable = {
"years": lambda n: ngettext("%s year<!--after-->", "%s years<!--after-->", n),
"months": lambda n: ngettext("%s month<!--after-->", "%s months<!--after-->", n),
"days": lambda n: ngettext("%s day<!--after-->", "%s days<!--after-->", n),
"hours": lambda n: ngettext("%s hour<!--after-->", "%s hours<!--after-->", n),
"minutes": lambda n: ngettext("%s minute<!--after-->", "%s minutes<!--after-->", n),
"seconds": lambda n: ngettext("%s second<!--after-->", "%s seconds<!--after-->", n),
}
shortTimeTable = {
"years": _("%sy"),
"months": _("%sm"),
"days": _("%sd"),
"hours": _("%sh"),
"minutes": _("%sm"),
"seconds": _("%ss"),
}
def fmtTimeSpan(time, pad=0, point=0, short=False, after=False):
"Return a string representing a time span (eg '2 days')."
(type, point) = optimalPeriod(time, point)
time = convertSecondsTo(time, type)
if not point:
time = math.floor(time)
if short:
fmt = shortTimeTable[type]
else:
if after:
fmt = afterTimeTable[type](_pluralCount(time, point))
else:
fmt = timeTable[type](_pluralCount(time, point))
timestr = "%(a)d.%(b)df" % {'a': pad, 'b': point}
return locale.format_string("%" + (fmt % timestr), time)
def optimalPeriod(time, point):
if abs(time) < 60:
type = "seconds"
point -= 1
elif abs(time) < 3599:
type = "minutes"
elif abs(time) < 60 * 60 * 24:
type = "hours"
elif abs(time) < 60 * 60 * 24 * 30:
type = "days"
elif abs(time) < 60 * 60 * 24 * 365:
type = "months"
point += 1
else:
type = "years"
point += 1
return (type, max(point, 0))
def convertSecondsTo(seconds, type):
if type == "seconds":
return seconds
elif type == "minutes":
return seconds / 60.0
elif type == "hours":
return seconds / 3600.0
elif type == "days":
return seconds / 86400.0
elif type == "months":
return seconds / 2592000.0
elif type == "years":
return seconds / 31536000.0
assert False
def _pluralCount(time, point):
if point:
return 2
return math.floor(time)
# Locale
##############################################################################
def fmtPercentage(float_value, point=1):
"Return float with percentage sign"
fmt = '%' + "0.%(b)df" % {'b': point}
return locale.format_string(fmt, float_value) + "%"
def fmtFloat(float_value, point=1):
"Return a string with decimal separator according to current locale"
fmt = '%' + "0.%(b)df" % {'b': point}
return locale.format_string(fmt, float_value)
# HTML
##############################################################################
def stripHTML(s):
s = re.sub("(?s)<style.*?>.*?</style>", "", s)
s = re.sub("(?s)<script.*?>.*?</script>", "", s)
s = re.sub("<.*?>", "", s)
s = entsToTxt(s)
return s
def stripHTMLAlt(s):
"Strip HTML, preserving img alt text."
s = re.sub("<img [^>]*alt=[\"']?([^\"'>]+)[\"']?[^>]*>", "\\1", s)
return stripHTML(s)
def stripHTMLMedia(s):
"Strip HTML but keep media filenames"
s = re.sub("<img src=[\"']?([^\"'>]+)[\"']? ?/?>", " \\1 ", s)
return stripHTML(s)
def tidyHTML(html):
"Remove cruft like body tags and return just the important part."
# contents of body - no head or html tags
html = re.sub(u".*<body.*?>(.*)</body></html>",
"\\1", html.replace("\n", u""))
# strip superfluous Qt formatting
html = re.sub(u"(?:-qt-table-type: root; )?"
"margin-top:\d+px; margin-bottom:\d+px; margin-left:\d+px; "
"margin-right:\d+px;(?: -qt-block-indent:0; "
"text-indent:0px;)?", u"", html)
html = re.sub(u"-qt-paragraph-type:empty;", u"", html)
# strip leading space in style statements, and remove if no contents
html = re.sub(u'style=" ', u'style="', html)
html = re.sub(u' style=""', u"", html)
# convert P tags into SPAN and/or BR
html = re.sub(u'<p( style=.+?)>(.*?)</p>', u'<span\\1>\\2</span><br>', html)
html = re.sub(u'<p>(.*?)</p>', u'\\1<br>', html)
html = re.sub(u'<br>$', u'', html)
html = re.sub(u"^<table><tr><td style=\"border: none;\">(.*)<br></td></tr></table>$", u"\\1", html)
# this is being added by qt's html editor, and leads to unwanted spaces
html = re.sub(u"^<p dir='rtl'>(.*?)</p>$", u'\\1', html)
html = re.sub(u"^<br />$", "", html)
return html
def entsToTxt(html):
def fixup(m):
text = m.group(0)
if text[:2] == "&#":
# character reference
try:
if text[:3] == "&#x":
return unichr(int(text[3:-1], 16))
else:
return unichr(int(text[2:-1]))
except ValueError:
pass
else:
# named entity
try:
text = unichr(htmlentitydefs.name2codepoint[text[1:-1]])
except KeyError:
pass
return text # leave as is
return re.sub("&#?\w+;", fixup, html)
# IDs
##############################################################################
def genID(static=[]):
"Generate a random, unique 64bit ID."
# 23 bits of randomness, 41 bits of current time
# random rather than a counter to ensure efficient btree
t = long(time.time()*1000)
if not static:
static.extend([t, {}])
else:
if static[0] != t:
static[0] = t
static[1] = {}
while 1:
rand = random.getrandbits(23)
if rand not in static[1]:
static[1][rand] = True
break
x = rand << 41 | t
# turn into a signed long
if x >= 9223372036854775808L:
x -= 18446744073709551616L
return x
def hexifyID(id):
if id < 0:
id += 18446744073709551616L
return "%x" % id
def dehexifyID(id):
id = int(id, 16)
if id >= 9223372036854775808L:
id -= 18446744073709551616L
return id
def ids2str(ids):
"""Given a list of integers, return a string '(int1,int2,.)'
The caller is responsible for ensuring only integers are provided.
This is safe if you use sqlite primary key columns, which are guaranteed
to be integers."""
return "(%s)" % ",".join([str(i) for i in ids])
# Tags
##############################################################################
def parseTags(tags):
"Parse a string and return a list of tags."
tags = re.split(" |, ?", tags)
return [t.strip() for t in tags if t.strip()]
def joinTags(tags):
return u" ".join(tags)
def canonifyTags(tags):
"Strip leading/trailing/superfluous commas and duplicates."
tags = [t.lstrip(":") for t in set(parseTags(tags))]
return joinTags(sorted(tags))
def findTag(tag, tags):
"True if TAG is in TAGS. Ignore case."
if not isinstance(tags, types.ListType):
tags = parseTags(tags)
return tag.lower() in [t.lower() for t in tags]
def addTags(tagstr, tags):
"Add tags if they don't exist."
currentTags = parseTags(tags)
for tag in parseTags(tagstr):
if not findTag(tag, currentTags):
currentTags.append(tag)
return joinTags(currentTags)
def deleteTags(tagstr, tags):
"Delete tags if they don't exists."
currentTags = parseTags(tags)
for tag in parseTags(tagstr):
try:
currentTags.remove(tag)
except ValueError:
pass
return joinTags(currentTags)
# Misc
##############################################################################
def checksum(data):
return md5(data).hexdigest()
def call(argv, wait=True, **kwargs):
try:
o = subprocess.Popen(argv, **kwargs)
except OSError:
# command not found
return -1
if wait:
while 1:
try:
ret = o.wait()
except OSError:
# interrupted system call
continue
break
else:
ret = 0
return ret

Binary file not shown.

Binary file not shown.

View File

@ -17,7 +17,8 @@ def test_anki2():
# get the deck to import # get the deck to import
tmp = getUpgradeDeckPath() tmp = getUpgradeDeckPath()
u = Upgrader() u = Upgrader()
src = u.upgrade(tmp) u.check(tmp)
src = u.upgrade()
srcpath = src.path srcpath = src.path
srcNotes = src.noteCount() srcNotes = src.noteCount()
srcCards = src.cardCount() srcCards = src.cardCount()

View File

@ -10,16 +10,21 @@ from anki.utils import checksum
def test_check(): def test_check():
dst = getUpgradeDeckPath() dst = getUpgradeDeckPath()
u = Upgrader() u = Upgrader()
assert u.check(dst) assert u.check(dst) == "ok"
# if it's corrupted, will fail # if it's corrupted, will fail
open(dst, "w+").write("foo") open(dst, "w+").write("foo")
assert not u.check(dst) assert u.check(dst) == "invalid"
# the upgrade should be able to fix non-fatal errors -
# test with a deck that has cards with missing notes
dst = getUpgradeDeckPath("anki12-broken.anki")
assert "with missing fact" in u.check(dst)
def test_upgrade1(): def test_upgrade1():
dst = getUpgradeDeckPath() dst = getUpgradeDeckPath()
csum = checksum(open(dst).read()) csum = checksum(open(dst).read())
u = Upgrader() u = Upgrader()
deck = u.upgrade(dst) u.check(dst)
deck = u.upgrade()
# src file must not have changed # src file must not have changed
assert csum == checksum(open(dst).read()) assert csum == checksum(open(dst).read())
# creation time should have been adjusted # creation time should have been adjusted
@ -45,14 +50,15 @@ def test_upgrade1():
def test_upgrade1_due(): def test_upgrade1_due():
dst = getUpgradeDeckPath("anki12-due.anki") dst = getUpgradeDeckPath("anki12-due.anki")
u = Upgrader() u = Upgrader()
deck = u.upgrade(dst) u.check(dst)
deck = u.upgrade()
assert not deck.db.scalar("select 1 from cards where due != 1") assert not deck.db.scalar("select 1 from cards where due != 1")
def test_invalid_ords(): def test_invalid_ords():
dst = getUpgradeDeckPath("invalid-ords.anki") dst = getUpgradeDeckPath("invalid-ords.anki")
u = Upgrader() u = Upgrader()
u.check(dst) u.check(dst)
deck = u.upgrade(dst) deck = u.upgrade()
assert deck.db.scalar("select count() from cards where ord = 0") == 1 assert deck.db.scalar("select count() from cards where ord = 0") == 1
assert deck.db.scalar("select count() from cards where ord = 1") == 1 assert deck.db.scalar("select count() from cards where ord = 1") == 1