a7812dedc0
The enum changes should work on PyQt 5.x, and are required in PyQt 6.x. They are not supported by the PyQt5 typings however, so we need to run our tests with PyQt6.
359 lines
13 KiB
Python
359 lines
13 KiB
Python
# Copyright: Ankitects Pty Ltd and contributors
|
|
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any, Callable, Sequence
|
|
|
|
import aqt
|
|
from anki.cards import Card, CardId
|
|
from anki.collection import BrowserColumns as Columns
|
|
from anki.collection import Collection
|
|
from anki.consts import *
|
|
from anki.errors import NotFoundError
|
|
from anki.notes import Note, NoteId
|
|
from aqt import gui_hooks
|
|
from aqt.browser.table import Cell, CellRow, Column, ItemId, SearchContext
|
|
from aqt.browser.table.state import ItemState
|
|
from aqt.qt import *
|
|
from aqt.utils import tr
|
|
|
|
|
|
class DataModel(QAbstractTableModel):
|
|
"""Data manager for the browser table.
|
|
|
|
_items -- The card or note ids currently hold and corresponding to the
|
|
table's rows.
|
|
_rows -- The cached data objects to render items to rows.
|
|
columns -- The data objects of all available columns, used to define the display
|
|
of active columns and list all toggleable columns to the user.
|
|
_block_updates -- If True, serve stale content to avoid hitting the DB.
|
|
_stale_cutoff -- A threshold to decide whether a cached row has gone stale.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
col: Collection,
|
|
state: ItemState,
|
|
row_state_will_change_callback: Callable,
|
|
row_state_changed_callback: Callable,
|
|
) -> None:
|
|
QAbstractTableModel.__init__(self)
|
|
self.col: Collection = col
|
|
self.columns: dict[str, Column] = {
|
|
c.key: c for c in self.col.all_browser_columns()
|
|
}
|
|
gui_hooks.browser_did_fetch_columns(self.columns)
|
|
self._state: ItemState = state
|
|
self._items: Sequence[ItemId] = []
|
|
self._rows: dict[int, CellRow] = {}
|
|
self._block_updates = False
|
|
self._stale_cutoff = 0.0
|
|
self._on_row_state_will_change = row_state_will_change_callback
|
|
self._on_row_state_changed = row_state_changed_callback
|
|
self._want_tooltips = aqt.mw.pm.show_browser_table_tooltips()
|
|
|
|
# Row Object Interface
|
|
######################################################################
|
|
|
|
# Get Rows
|
|
|
|
def get_cell(self, index: QModelIndex) -> Cell:
|
|
return self.get_row(index).cells[index.column()]
|
|
|
|
def get_row(self, index: QModelIndex) -> CellRow:
|
|
item = self.get_item(index)
|
|
if row := self._rows.get(item):
|
|
if not self._block_updates and row.is_stale(self._stale_cutoff):
|
|
# need to refresh
|
|
return self._fetch_row_and_update_cache(index, item, row)
|
|
# return row, even if it's stale
|
|
return row
|
|
if self._block_updates:
|
|
# blank row until we unblock
|
|
return CellRow.placeholder(self.len_columns())
|
|
# missing row, need to build
|
|
return self._fetch_row_and_update_cache(index, item, None)
|
|
|
|
def _fetch_row_and_update_cache(
|
|
self, index: QModelIndex, item: ItemId, old_row: CellRow | None
|
|
) -> CellRow:
|
|
"""Fetch a row from the backend, add it to the cache and return it.
|
|
Then fire callbacks if the row is being deleted or restored.
|
|
"""
|
|
new_row = self._fetch_row_from_backend(item)
|
|
# row state has changed if existence of cached and fetched counterparts differ
|
|
# if the row was previously uncached, it is assumed to have existed
|
|
state_change = (
|
|
new_row.is_deleted
|
|
if old_row is None
|
|
else old_row.is_deleted != new_row.is_deleted
|
|
)
|
|
if state_change:
|
|
self._on_row_state_will_change(index, not new_row.is_deleted)
|
|
self._rows[item] = new_row
|
|
if state_change:
|
|
self._on_row_state_changed(index, not new_row.is_deleted)
|
|
return self._rows[item]
|
|
|
|
def _fetch_row_from_backend(self, item: ItemId) -> CellRow:
|
|
try:
|
|
row = CellRow(*self.col.browser_row_for_id(item))
|
|
except NotFoundError:
|
|
return CellRow.deleted(self.len_columns())
|
|
except Exception as e:
|
|
return CellRow.generic(self.len_columns(), str(e))
|
|
except BaseException as e:
|
|
# fatal error like a panic in the backend - dump it to the
|
|
# console so it gets picked up by the error handler
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
# and prevent Qt from firing a storm of follow-up errors
|
|
self._block_updates = True
|
|
return CellRow.generic(self.len_columns(), "error")
|
|
|
|
gui_hooks.browser_did_fetch_row(
|
|
item, self._state.is_notes_mode(), row, self._state.active_columns
|
|
)
|
|
return row
|
|
|
|
def get_cached_row(self, index: QModelIndex) -> CellRow | None:
|
|
"""Get row if it is cached, regardless of staleness."""
|
|
return self._rows.get(self.get_item(index))
|
|
|
|
# Reset
|
|
|
|
def mark_cache_stale(self) -> None:
|
|
self._stale_cutoff = time.time()
|
|
|
|
def reset(self) -> None:
|
|
self.begin_reset()
|
|
self.end_reset()
|
|
|
|
def begin_reset(self) -> None:
|
|
self.beginResetModel()
|
|
self.mark_cache_stale()
|
|
|
|
def end_reset(self) -> None:
|
|
self.endResetModel()
|
|
|
|
# Block/Unblock
|
|
|
|
def begin_blocking(self) -> None:
|
|
self._block_updates = True
|
|
|
|
def end_blocking(self) -> None:
|
|
self._block_updates = False
|
|
self.redraw_cells()
|
|
|
|
def redraw_cells(self) -> None:
|
|
"Update cell contents, without changing search count/columns/sorting."
|
|
if self.is_empty():
|
|
return
|
|
top_left = self.index(0, 0)
|
|
bottom_right = self.index(self.len_rows() - 1, self.len_columns() - 1)
|
|
self.dataChanged.emit(top_left, bottom_right) # type: ignore
|
|
|
|
# Item Interface
|
|
######################################################################
|
|
|
|
# Get metadata
|
|
|
|
def is_empty(self) -> bool:
|
|
return not self._items
|
|
|
|
def len_rows(self) -> int:
|
|
return len(self._items)
|
|
|
|
def len_columns(self) -> int:
|
|
return len(self._state.active_columns)
|
|
|
|
# Get items (card or note ids depending on state)
|
|
|
|
def get_item(self, index: QModelIndex) -> ItemId:
|
|
return self._items[index.row()]
|
|
|
|
def get_items(self, indices: list[QModelIndex]) -> Sequence[ItemId]:
|
|
return [self.get_item(index) for index in indices]
|
|
|
|
def get_card_ids(self, indices: list[QModelIndex]) -> Sequence[CardId]:
|
|
return self._state.get_card_ids(self.get_items(indices))
|
|
|
|
def get_note_ids(self, indices: list[QModelIndex]) -> Sequence[NoteId]:
|
|
return self._state.get_note_ids(self.get_items(indices))
|
|
|
|
def get_note_id(self, index: QModelIndex) -> NoteId | None:
|
|
if nid_list := self._state.get_note_ids([self.get_item(index)]):
|
|
return nid_list[0]
|
|
return None
|
|
|
|
# Get row numbers from items
|
|
|
|
def get_item_row(self, item: ItemId) -> int | None:
|
|
for row, i in enumerate(self._items):
|
|
if i == item:
|
|
return row
|
|
return None
|
|
|
|
def get_item_rows(self, items: Sequence[ItemId]) -> list[int]:
|
|
rows = []
|
|
for row, i in enumerate(self._items):
|
|
if i in items:
|
|
rows.append(row)
|
|
return rows
|
|
|
|
def get_card_row(self, card_id: CardId) -> int | None:
|
|
return self.get_item_row(self._state.get_item_from_card_id(card_id))
|
|
|
|
# Get objects (cards or notes)
|
|
|
|
def get_card(self, index: QModelIndex) -> Card | None:
|
|
"""Try to return the indicated, possibly deleted card."""
|
|
if not index.isValid():
|
|
return None
|
|
try:
|
|
return self._state.get_card(self.get_item(index))
|
|
except NotFoundError:
|
|
return None
|
|
|
|
def get_note(self, index: QModelIndex) -> Note | None:
|
|
"""Try to return the indicated, possibly deleted note."""
|
|
if not index.isValid():
|
|
return None
|
|
try:
|
|
return self._state.get_note(self.get_item(index))
|
|
except NotFoundError:
|
|
return None
|
|
|
|
# Table Interface
|
|
######################################################################
|
|
|
|
def toggle_state(self, context: SearchContext) -> ItemState:
|
|
self.beginResetModel()
|
|
self._state = self._state.toggle_state()
|
|
self.search(context)
|
|
return self._state
|
|
|
|
# Rows
|
|
|
|
def search(self, context: SearchContext) -> None:
|
|
self.begin_reset()
|
|
try:
|
|
if context.order is True:
|
|
try:
|
|
context.order = self.columns[self._state.sort_column]
|
|
except KeyError:
|
|
# invalid sort column in config
|
|
context.order = self.columns["noteCrt"]
|
|
context.reverse = self._state.sort_backwards
|
|
gui_hooks.browser_will_search(context)
|
|
if context.ids is None:
|
|
context.ids = self._state.find_items(
|
|
context.search, context.order, context.reverse
|
|
)
|
|
gui_hooks.browser_did_search(context)
|
|
self._items = context.ids
|
|
self._rows = {}
|
|
finally:
|
|
self.end_reset()
|
|
|
|
def reverse(self) -> None:
|
|
self.beginResetModel()
|
|
self._items = list(reversed(self._items))
|
|
self.endResetModel()
|
|
|
|
# Columns
|
|
|
|
def column_at(self, index: QModelIndex) -> Column:
|
|
return self.column_at_section(index.column())
|
|
|
|
def column_at_section(self, section: int) -> Column:
|
|
"""Returns the column object corresponding to the active column at index or the default
|
|
column object if no data is associated with the active column.
|
|
"""
|
|
key = self._state.column_key_at(section)
|
|
try:
|
|
return self.columns[key]
|
|
except KeyError:
|
|
self.columns[key] = addon_column_fillin(key)
|
|
return self.columns[key]
|
|
|
|
def active_column_index(self, column: str) -> int | None:
|
|
return (
|
|
self._state.active_columns.index(column)
|
|
if column in self._state.active_columns
|
|
else None
|
|
)
|
|
|
|
def toggle_column(self, column: str) -> None:
|
|
self.begin_reset()
|
|
self._state.toggle_active_column(column)
|
|
self.end_reset()
|
|
|
|
# Model interface
|
|
######################################################################
|
|
|
|
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int:
|
|
if parent and parent.isValid():
|
|
return 0
|
|
return self.len_rows()
|
|
|
|
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
|
|
if parent and parent.isValid():
|
|
return 0
|
|
return self.len_columns()
|
|
|
|
def data(self, index: QModelIndex = QModelIndex(), role: int = 0) -> Any:
|
|
if not index.isValid():
|
|
return QVariant()
|
|
if role == Qt.ItemDataRole.FontRole:
|
|
if not self.column_at(index).uses_cell_font:
|
|
return QVariant()
|
|
qfont = QFont()
|
|
row = self.get_row(index)
|
|
qfont.setFamily(row.font_name)
|
|
qfont.setPixelSize(row.font_size)
|
|
return qfont
|
|
elif role == Qt.ItemDataRole.TextAlignmentRole:
|
|
align: Qt.AlignmentFlag | int = Qt.AlignmentFlag.AlignVCenter
|
|
if self.column_at(index).alignment == Columns.ALIGNMENT_CENTER:
|
|
align |= Qt.AlignmentFlag.AlignHCenter
|
|
return align
|
|
elif role == Qt.ItemDataRole.DisplayRole:
|
|
return self.get_cell(index).text
|
|
elif role == Qt.ItemDataRole.ToolTipRole and self._want_tooltips:
|
|
return self.get_cell(index).text
|
|
return QVariant()
|
|
|
|
def headerData(
|
|
self, section: int, orientation: Qt.Orientation, role: int = 0
|
|
) -> str | None:
|
|
if (
|
|
orientation == Qt.Orientation.Horizontal
|
|
and role == Qt.ItemDataRole.DisplayRole
|
|
):
|
|
return self._state.column_label(self.column_at_section(section))
|
|
return None
|
|
|
|
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
|
|
# shortcut for large selections (Ctrl+A) to avoid fetching large numbers of rows at once
|
|
if row := self.get_cached_row(index):
|
|
if row.is_deleted:
|
|
return Qt.ItemFlag(Qt.ItemFlag.NoItemFlags)
|
|
return Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable
|
|
|
|
|
|
def addon_column_fillin(key: str) -> Column:
|
|
"""Return a column with generic fields and a label indicating to the user that this column was
|
|
added by an add-on.
|
|
"""
|
|
return Column(
|
|
key=key,
|
|
cards_mode_label=f"{tr.browsing_addon()} ({key})",
|
|
notes_mode_label=f"{tr.browsing_addon()} ({key})",
|
|
sorting=Columns.SORTING_NONE,
|
|
uses_cell_font=False,
|
|
alignment=Columns.ALIGNMENT_CENTER,
|
|
)
|