diff --git a/pylib/anki/sched.py b/pylib/anki/sched.py
index 75bcf968b..f9ad8006a 100644
--- a/pylib/anki/sched.py
+++ b/pylib/anki/sched.py
@@ -9,14 +9,12 @@ import time
import weakref
from heapq import *
from operator import itemgetter
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import anki
from anki import hooks
from anki.cards import Card
from anki.consts import *
-from anki.lang import _
-from anki.rsbackend import FormatTimeSpanContext
from anki.schedv2 import Scheduler as V2
from anki.utils import ids2str, intTime
@@ -45,28 +43,6 @@ class Scheduler(V2):
self._haveQueues = False
self._updateCutoff()
- def getCard(self) -> Optional[Card]:
- "Pop the next card from the queue. None if finished."
- self._checkDay()
- if not self._haveQueues:
- self.reset()
- card = self._getCard()
- if card:
- self.col.log(card)
- if not self._burySiblingsOnAnswer:
- self._burySiblings(card)
- self.reps += 1
- card.startTimer()
- return card
- return None
-
- def reset(self) -> None:
- self._updateCutoff()
- self._resetLrn()
- self._resetRev()
- self._resetNew()
- self._haveQueues = True
-
def answerCard(self, card: Card, ease: int) -> None:
self.col.log()
assert 1 <= ease <= 4
@@ -118,29 +94,6 @@ class Scheduler(V2):
new, lrn, rev = counts
return (new, lrn, rev)
- def dueForecast(self, days: int = 7) -> List[Any]:
- "Return counts over next DAYS. Includes today."
- daysd = dict(
- self.col.db.all(
- f"""
-select due, count() from cards
-where did in %s and queue = {QUEUE_TYPE_REV}
-and due between ? and ?
-group by due
-order by due"""
- % self._deckLimit(),
- self.today,
- self.today + days - 1,
- )
- )
- for d in range(days):
- d = self.today + d
- if d not in daysd:
- daysd[d] = 0
- # return in sorted order
- ret = [x[1] for x in sorted(daysd.items())]
- return ret
-
def countIdx(self, card: Card) -> int:
if card.queue == QUEUE_TYPE_DAY_LEARN_RELEARN:
return QUEUE_TYPE_LRN
@@ -187,64 +140,6 @@ order by due"""
self.col.usn(),
)
- # Rev/lrn/time daily stats
- ##########################################################################
-
- def _updateStats(self, card: Card, type: str, cnt: int = 1) -> None:
- key = type + "Today"
- for g in [self.col.decks.get(card.did)] + self.col.decks.parents(card.did):
- # add
- g[key][1] += cnt
- self.col.decks.save(g)
-
- def extendLimits(self, new: int, rev: int) -> None:
- cur = self.col.decks.current()
- parents = self.col.decks.parents(cur["id"])
- children = [
- self.col.decks.get(did)
- for (name, did) in self.col.decks.children(cur["id"])
- ]
- for g in [cur] + parents + children:
- # add
- g["newToday"][1] -= new
- g["revToday"][1] -= rev
- self.col.decks.save(g)
-
- def _walkingCount(
- self,
- limFn: Optional[Callable[[Any], Optional[int]]] = None,
- cntFn: Optional[Callable[[int, int], int]] = None,
- ) -> int:
- tot = 0
- pcounts: Dict[int, int] = {}
- # for each of the active decks
- nameMap = self.col.decks.nameMap()
- for did in self.col.decks.active():
- # early alphas were setting the active ids as a str
- did = int(did)
- # get the individual deck's limit
- lim = limFn(self.col.decks.get(did))
- if not lim:
- continue
- # check the parents
- parents = self.col.decks.parents(did, nameMap)
- for p in parents:
- # add if missing
- if p["id"] not in pcounts:
- pcounts[p["id"]] = limFn(p)
- # take minimum of child and parent
- lim = min(pcounts[p["id"]], lim)
- # see how many cards we actually have
- cnt = cntFn(did, lim)
- # if non-zero, decrement from parent counts
- for p in parents:
- pcounts[p["id"]] -= cnt
- # we may also be a parent
- pcounts[did] = lim - cnt
- # and add to running total
- tot += cnt
- return tot
-
# Deck list
##########################################################################
@@ -284,18 +179,6 @@ order by due"""
lims[deck["name"]] = [nlim, rlim]
return data
- def deckDueTree(self) -> Any:
- return self._groupChildren(self.deckDueList())
-
- def _groupChildren(self, grps: List[List[Any]]) -> Any:
- # first, split the group names into components
- for g in grps:
- g[0] = g[0].split("::")
- # and sort based on those components
- grps.sort(key=itemgetter(0))
- # then run main function
- return self._groupChildrenMain(grps)
-
def _groupChildrenMain(self, grps: List[List[Any]]) -> Any:
tree = []
# group and recurse
@@ -367,108 +250,6 @@ order by due"""
# New cards
##########################################################################
- def _resetNewCount(self) -> None:
- cntFn = lambda did, lim: self.col.db.scalar(
- f"""
-select count() from (select 1 from cards where
-did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
- did,
- lim,
- )
- self.newCount = self._walkingCount(self._deckNewLimitSingle, cntFn)
-
- def _resetNew(self) -> None:
- self._resetNewCount()
- self._newDids = self.col.decks.active()[:]
- self._newQueue: List[Any] = []
- self._updateNewCardRatio()
-
- def _fillNew(self) -> Optional[bool]:
- if self._newQueue:
- return True
- if not self.newCount:
- return False
- while self._newDids:
- did = self._newDids[0]
- lim = min(self.queueLimit, self._deckNewLimit(did))
- if lim:
- # fill the queue with the current did
- self._newQueue = self.col.db.list(
- f"""
- select id from cards where did = ? and queue = {QUEUE_TYPE_NEW} order by due,ord limit ?""",
- did,
- lim,
- )
- if self._newQueue:
- self._newQueue.reverse()
- return True
- # nothing left in the deck; move to next
- self._newDids.pop(0)
- if self.newCount:
- # if we didn't get a card but the count is non-zero,
- # we need to check again for any cards that were
- # removed from the queue but not buried
- self._resetNew()
- return self._fillNew()
- return None
-
- def _getNewCard(self) -> Optional[Card]:
- if self._fillNew():
- self.newCount -= 1
- return self.col.getCard(self._newQueue.pop())
- return None
-
- def _updateNewCardRatio(self) -> None:
- if self.col.conf["newSpread"] == NEW_CARDS_DISTRIBUTE:
- if self.newCount:
- self.newCardModulus = (self.newCount + self.revCount) // self.newCount
- # if there are cards to review, ensure modulo >= 2
- if self.revCount:
- self.newCardModulus = max(2, self.newCardModulus)
- return
- self.newCardModulus = 0
-
- def _timeForNewCard(self) -> Optional[bool]:
- "True if it's time to display a new card when distributing."
- if not self.newCount:
- return False
- if self.col.conf["newSpread"] == NEW_CARDS_LAST:
- return False
- elif self.col.conf["newSpread"] == NEW_CARDS_FIRST:
- return True
- elif self.newCardModulus:
- return self.reps != 0 and self.reps % self.newCardModulus == 0
- return None
-
- def _deckNewLimit(
- self, did: int, fn: Optional[Callable[[Dict[str, Any]], int]] = None
- ) -> int:
- if not fn:
- fn = self._deckNewLimitSingle
- sel = self.col.decks.get(did)
- lim = -1
- # for the deck and each of its parents
- for g in [sel] + self.col.decks.parents(did):
- rem = fn(g)
- if lim == -1:
- lim = rem
- else:
- lim = min(rem, lim)
- return lim
-
- def _newForDeck(self, did: int, lim: int) -> int:
- "New count for a single deck."
- if not lim:
- return 0
- lim = min(lim, self.reportLimit)
- return self.col.db.scalar(
- f"""
-select count() from
-(select 1 from cards where did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
- did,
- lim,
- )
-
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int:
"Limit for deck without parent limits."
if g["dyn"]:
@@ -545,42 +326,6 @@ limit %d"""
return card
return None
- # daily learning
- def _fillLrnDay(self) -> Optional[bool]:
- if not self.lrnCount:
- return False
- if self._lrnDayQueue:
- return True
- while self._lrnDids:
- did = self._lrnDids[0]
- # fill the queue with the current did
- self._lrnDayQueue = self.col.db.list(
- f"""
-select id from cards where
-did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
- did,
- self.today,
- self.queueLimit,
- )
- if self._lrnDayQueue:
- # order
- r = random.Random()
- r.seed(self.today)
- r.shuffle(self._lrnDayQueue)
- # is the current did empty?
- if len(self._lrnDayQueue) < self.queueLimit:
- self._lrnDids.pop(0)
- return True
- # nothing left in the deck; move to next
- self._lrnDids.pop(0)
- return None
-
- def _getLrnDayCard(self) -> Optional[Card]:
- if self._fillLrnDay():
- self.lrnCount -= 1
- return self.col.getCard(self._lrnDayQueue.pop())
- return None
-
def _answerLrnCard(self, card: Card, ease: int) -> None:
# ease 1=no, 2=yes, 3=remove
conf = self._lrnConf(card)
@@ -643,18 +388,6 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN
self._logLrn(card, ease, conf, leaving, type, lastLeft)
- def _delayForGrade(self, conf: Dict[str, Any], left: int) -> float:
- left = left % 1000
- try:
- delay = conf["delays"][-left]
- except IndexError:
- if conf["delays"]:
- delay = conf["delays"][0]
- else:
- # user deleted final step; use dummy value
- delay = 1
- return delay * 60
-
def _lrnConf(self, card: Card) -> Dict[str, Any]:
if card.type == CARD_TYPE_REV:
return self._lapseConf(card)
@@ -900,12 +633,6 @@ did = ? and queue = {QUEUE_TYPE_REV} and due <= ? limit ?""",
return None
- def _getRevCard(self) -> Optional[Card]:
- if self._fillRev():
- self.revCount -= 1
- return self.col.getCard(self._revQueue.pop())
- return None
-
def totalRevForCurrentDeck(self) -> int:
return self.col.db.scalar(
f"""
@@ -1025,35 +752,11 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
# interval capped?
return min(interval, conf["maxIvl"])
- def _fuzzedIvl(self, ivl: int) -> int:
- min, max = self._fuzzIvlRange(ivl)
- return random.randint(min, max)
-
- def _fuzzIvlRange(self, ivl: int) -> List[int]:
- if ivl < 2:
- return [1, 1]
- elif ivl == 2:
- return [2, 3]
- elif ivl < 7:
- fuzz = int(ivl * 0.25)
- elif ivl < 30:
- fuzz = max(2, int(ivl * 0.15))
- else:
- fuzz = max(4, int(ivl * 0.05))
- # fuzz at least a day
- fuzz = max(fuzz, 1)
- return [ivl - fuzz, ivl + fuzz]
-
def _constrainedIvl(self, ivl: float, conf: Dict[str, Any], prev: int) -> int: # type: ignore[override]
"Integer interval after interval factor and prev+1 constraints applied."
new = ivl * conf.get("ivlFct", 1)
return int(max(new, prev + 1))
- def _daysLate(self, card: Card) -> int:
- "Number of days later than scheduled."
- due = card.odue if card.odid else card.due
- return max(0, self.today - due)
-
def _updateRevIvl(self, card: Card, ease: int) -> None:
idealIvl = self._nextRevIvl(card, ease)
card.ivl = min(
@@ -1113,9 +816,6 @@ due = odue, odue = 0, odid = 0, usn = ? where %s"""
self.col.usn(),
)
- def remFromDyn(self, cids: List[int]) -> None:
- self.emptyDyn(None, "id in %s and odid" % ids2str(cids))
-
def _dynOrder(self, o: int, l: int) -> str:
if o == DYN_OLDEST:
t = "(select max(id) from revlog where cid=c.id)"
@@ -1251,17 +951,6 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
resched=conf["resched"],
)
- def _revConf(self, card: Card) -> Dict[str, Any]:
- conf = self._cardConf(card)
- # normal deck
- if not card.odid:
- return conf["rev"]
- # dynamic deck
- return self.col.decks.confForDid(card.odid)["rev"]
-
- def _deckLimit(self) -> str:
- return ids2str(self.col.decks.active())
-
def _resched(self, card: Card) -> bool:
conf = self._cardConf(card)
if not conf["dyn"]:
@@ -1294,87 +983,9 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
if unburied < self.today:
self.unburyCards()
- def _checkDay(self) -> None:
- # check if the day has rolled over
- if time.time() > self.dayCutoff:
- self.reset()
-
# Deck finished state
##########################################################################
- def finishedMsg(self) -> str:
- return (
- ""
- + _("Congratulations! You have finished this deck for now.")
- + "
"
- + self._nextDueMsg()
- )
-
- def _nextDueMsg(self) -> str:
- line = []
- # the new line replacements are so we don't break translations
- # in a point release
- if self.revDue():
- line.append(
- _(
- """\
-Today's review limit has been reached, but there are still cards
-waiting to be reviewed. For optimum memory, consider increasing
-the daily limit in the options."""
- ).replace("\n", " ")
- )
- if self.newDue():
- line.append(
- _(
- """\
-There are more new cards available, but the daily limit has been
-reached. You can increase the limit in the options, but please
-bear in mind that the more new cards you introduce, the higher
-your short-term review workload will become."""
- ).replace("\n", " ")
- )
- if self.haveBuried():
- if self.haveCustomStudy:
- now = " " + _("To see them now, click the Unbury button below.")
- else:
- now = ""
- line.append(
- _(
- """\
-Some related or buried cards were delayed until a later session."""
- )
- + now
- )
- if self.haveCustomStudy and not self.col.decks.current()["dyn"]:
- line.append(
- _(
- """\
-To study outside of the normal schedule, click the Custom Study button below."""
- )
- )
- return "
".join(line) - - def revDue(self) -> Optional[int]: - "True if there are any rev cards due." - return self.col.db.scalar( - ( - f"select 1 from cards where did in %s and queue = {QUEUE_TYPE_REV} " - "and due <= ? limit 1" - ) - % self._deckLimit(), - self.today, - ) - - def newDue(self) -> Optional[int]: - "True if there are any new cards due." - return self.col.db.scalar( - ( - f"select 1 from cards where did in %s and queue = {QUEUE_TYPE_NEW} " - "limit 1" - ) - % self._deckLimit() - ) - def haveBuried(self) -> bool: sdids = ids2str(self.col.decks.active()) cnt = self.col.db.scalar( @@ -1386,18 +997,6 @@ To study outside of the normal schedule, click the Custom Study button below.""" # Next time reports ########################################################################## - def nextIvlStr(self, card: Card, ease: int, short: bool = False) -> str: - "Return the next interval for CARD as a string." - ivl_secs = self.nextIvl(card, ease) - if not ivl_secs: - return _("(end)") - s = self.col.backend.format_time_span( - ivl_secs, FormatTimeSpanContext.ANSWER_BUTTONS - ) - if ivl_secs < self.col.conf["collapseTime"]: - s = "<" + s - return s - def nextIvl(self, card: Card, ease: int) -> float: "Return the next interval for CARD, in seconds." if card.queue in (QUEUE_TYPE_NEW, QUEUE_TYPE_LRN, QUEUE_TYPE_DAY_LEARN_RELEARN): @@ -1472,13 +1071,6 @@ update cards set queue={QUEUE_TYPE_SIBLING_BURIED},mod=?,usn=? where id in """ self.col.usn(), ) - def buryNote(self, nid: int) -> None: - "Bury all cards for note until next session." - cids = self.col.db.list( - "select id from cards where nid = ? and queue >= 0", nid - ) - self.buryCards(cids) - # Sibling spacing ########################################################################## @@ -1522,149 +1114,3 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""", self.col.usn(), ) self.col.log(toBury) - - # Resetting - ########################################################################## - - def forgetCards(self, ids: List[int]) -> None: - "Put cards at the end of the new queue." - self.remFromDyn(ids) - self.col.db.execute( - f"update cards set type={CARD_TYPE_NEW},queue={QUEUE_TYPE_NEW},ivl=0,due=0,odue=0,factor=?" - " where id in " + ids2str(ids), - STARTING_FACTOR, - ) - pmax = ( - self.col.db.scalar(f"select max(due) from cards where type={CARD_TYPE_NEW}") - or 0 - ) - # takes care of mod + usn - self.sortCards(ids, start=pmax + 1) - self.col.log(ids) - - def reschedCards(self, ids: List[int], imin: int, imax: int) -> None: - "Put cards in review queue with a new interval in days (min, max)." - d = [] - t = self.today - mod = intTime() - for id in ids: - r = random.randint(imin, imax) - d.append( - dict( - id=id, - due=r + t, - ivl=max(1, r), - mod=mod, - usn=self.col.usn(), - fact=STARTING_FACTOR, - ) - ) - self.remFromDyn(ids) - self.col.db.executemany( - f""" -update cards set type={CARD_TYPE_REV},queue={QUEUE_TYPE_REV},ivl=:ivl,due=:due,odue=0, -usn=:usn,mod=:mod,factor=:fact where id=:id""", - d, - ) - self.col.log(ids) - - def resetCards(self, ids: List[int]) -> None: - "Completely reset cards for export." - sids = ids2str(ids) - # we want to avoid resetting due number of existing new cards on export - nonNew = self.col.db.list( - f"select id from cards where id in %s and (queue != {QUEUE_TYPE_NEW} or type != {CARD_TYPE_NEW})" - % sids - ) - # reset all cards - self.col.db.execute( - f"update cards set reps=0,lapses=0,odid=0,odue=0,queue={QUEUE_TYPE_NEW}" - " where id in %s" % sids - ) - # and forget any non-new cards, changing their due numbers - self.forgetCards(nonNew) - self.col.log(ids) - - # Repositioning new cards - ########################################################################## - - def sortCards( - self, - cids: List[int], - start: int = 1, - step: int = 1, - shuffle: bool = False, - shift: bool = False, - ) -> None: - scids = ids2str(cids) - now = intTime() - nids = [] - nidsSet: Set[int] = set() - for id in cids: - nid = self.col.db.scalar("select nid from cards where id = ?", id) - if nid not in nidsSet: - nids.append(nid) - nidsSet.add(nid) - if not nids: - # no new cards - return - # determine nid ordering - due = {} - if shuffle: - random.shuffle(nids) - for c, nid in enumerate(nids): - due[nid] = start + c * step - # pylint: disable=undefined-loop-variable - high = start + c * step - # shift? - if shift: - low = self.col.db.scalar( - f"select min(due) from cards where due >= ? and type = {CARD_TYPE_NEW} " - "and id not in %s" % scids, - start, - ) - if low is not None: - shiftby = high - low + 1 - self.col.db.execute( - f""" -update cards set mod=?, usn=?, due=due+? where id not in %s -and due >= ? and queue = {QUEUE_TYPE_NEW}""" - % scids, - now, - self.col.usn(), - shiftby, - low, - ) - # reorder cards - d = [] - for id, nid in self.col.db.execute( - f"select id, nid from cards where type = {CARD_TYPE_NEW} and id in " + scids - ): - d.append(dict(now=now, due=due[nid], usn=self.col.usn(), cid=id)) - self.col.db.executemany( - "update cards set due=:due,mod=:now,usn=:usn where id = :cid", d - ) - - def randomizeCards(self, did: int) -> None: - cids = self.col.db.list("select id from cards where did = ?", did) - self.sortCards(cids, shuffle=True) - - def orderCards(self, did: int) -> None: - cids = self.col.db.list("select id from cards where did = ? order by nid", did) - self.sortCards(cids) - - def resortConf(self, conf) -> None: - for did in self.col.decks.didsForConf(conf): - if conf["new"]["order"] == 0: - self.randomizeCards(did) - else: - self.orderCards(did) - - # for post-import - def maybeRandomizeDeck(self, did: Optional[int] = None) -> None: - if not did: - did = self.col.decks.selected() - conf = self.col.decks.confForDid(did) - # in order due? - if conf["new"]["order"] == NEW_CARDS_RANDOM: - self.randomizeCards(did)