more TTS and sound work

- use provided language and voices when playing on Mac
- fix hang in waiting for termination
- allow players to return a rank for a given tag,
which will allow for the best matching player to be chosen
depending on the context (eg, prioritize one player for videos,
one tts player for certain voices, etc)
This commit is contained in:
Damien Elmes 2020-01-21 11:34:16 +10:00
parent d4d16d35a8
commit 66e277e44b
3 changed files with 189 additions and 102 deletions

View File

@ -10,7 +10,7 @@ import time
import wave
from abc import ABC, abstractmethod
from concurrent.futures import Future
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
from typing import Any, Callable, Dict, List, Optional, Tuple
import pyaudio
@ -33,12 +33,18 @@ OnDoneCallback = Callable[[], None]
class Player(ABC):
@abstractmethod
def can_play(self, tag: AVTag) -> bool:
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
pass
@abstractmethod
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
pass
def rank_for_tag(self, tag: AVTag) -> Optional[int]:
"""How suited this player is to playing tag.
AVPlayer will choose the player that returns the highest rank
for a given tag.
If None, this player can not play the tag.
"""
def stop(self) -> None:
"""Optional.
@ -56,8 +62,13 @@ class Player(ABC):
class SoundOrVideoPlayer(Player): # pylint: disable=abstract-method
def can_play(self, tag: AVTag) -> bool:
return isinstance(tag, SoundOrVideoTag)
default_rank = 0
def rank_for_tag(self, tag: AVTag) -> Optional[int]:
if isinstance(tag, SoundOrVideoTag):
return self.default_rank
else:
return None
# Main playing interface
@ -138,13 +149,28 @@ class AVPlayer:
self._play(next)
def _play(self, tag: AVTag) -> None:
for player in self.players:
if player.can_play(tag):
self.current_player = player
gui_hooks.av_player_will_play(tag)
player.play(tag, self._on_play_finished)
return
print("no players found for", tag)
best_player = self._best_player_for_tag(tag)
if best_player:
self.current_player = best_player
gui_hooks.av_player_will_play(tag)
self.current_player.play(tag, self._on_play_finished)
else:
print("no players found for", tag)
def _best_player_for_tag(self, tag: AVTag) -> Optional[Player]:
ranked = []
for p in self.players:
rank = p.rank_for_tag(tag)
if rank is not None:
ranked.append((rank, p))
ranked.sort()
print(ranked)
if ranked:
return ranked[-1][1]
else:
return None
av_player = AVPlayer()
@ -181,43 +207,50 @@ class PlayerInterrupted(Exception):
pass
class SimpleProcessPlayer(SoundOrVideoPlayer):
"A player that invokes a new process for each file to play."
class SimpleProcessPlayer(Player): # pylint: disable=abstract-method
"A player that invokes a new process for each tag to play."
args: List[str] = []
env: Optional[Dict[str, str]] = None
def __init__(self, taskman: TaskManager):
self._taskman = taskman
_terminate_flag = False
self._terminate_flag = False
self._process: Optional[subprocess.Popen] = None
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
stag = cast(SoundOrVideoTag, tag)
self._terminate_flag = False
self._taskman.run(
lambda: self._play(stag.filename), lambda res: self._on_done(res, on_done)
lambda: self._play(tag), lambda res: self._on_done(res, on_done)
)
def stop(self):
self._terminate_flag = True
# block until stopped
while self._terminate_flag:
t = time.time()
while self._terminate_flag and time.time() - t < 10:
time.sleep(0.1)
def _play(self, filename: str) -> None:
process = subprocess.Popen(self.args + [filename], env=self.env)
while True:
try:
process.wait(0.1)
if process.returncode != 0:
print(f"player got return code: {process.returncode}")
return
except subprocess.TimeoutExpired:
pass
if self._terminate_flag:
process.terminate()
self._terminate_flag = False
raise PlayerInterrupted()
def _play(self, tag: AVTag) -> None:
assert isinstance(tag, SoundOrVideoTag)
self._process = subprocess.Popen(self.args + [tag.filename], env=self.env)
self._wait_for_termination()
def _wait_for_termination(self):
try:
while True:
try:
self._process.wait(0.1)
if self._process.returncode != 0:
print(f"player got return code: {self._process.returncode}")
return
except subprocess.TimeoutExpired:
pass
if self._terminate_flag:
self._process.terminate()
raise PlayerInterrupted()
finally:
self._process = None
self._terminate_flag = False
def _on_done(self, ret: Future, cb: OnDoneCallback) -> None:
try:
@ -228,7 +261,7 @@ class SimpleProcessPlayer(SoundOrVideoPlayer):
cb()
class SimpleMpvPlayer(SimpleProcessPlayer):
class SimpleMpvPlayer(SimpleProcessPlayer, SoundOrVideoPlayer):
args, env = _packagedCmd(
[
"mpv",
@ -248,7 +281,7 @@ class SimpleMpvPlayer(SimpleProcessPlayer):
self.args += ["--no-config", "--include=" + conf_path]
class SimpleMplayerPlayer(SimpleProcessPlayer):
class SimpleMplayerPlayer(SimpleProcessPlayer, SoundOrVideoPlayer):
args, env = _packagedCmd(["mplayer", "-really-quiet", "-noautosub"])
if isWin:
args += ["-ao", "win32"]
@ -302,9 +335,9 @@ class MpvManager(MPV, SoundOrVideoPlayer):
self.default_argv += ["--no-config", "--include=" + conf_path]
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
stag = cast(SoundOrVideoTag, tag)
assert isinstance(tag, SoundOrVideoTag)
self._on_done = on_done
path = os.path.join(os.getcwd(), stag.filename)
path = os.path.join(os.getcwd(), tag.filename)
self.command("loadfile", path, "append-play")
def stop(self) -> None:
@ -343,27 +376,14 @@ class MpvManager(MPV, SoundOrVideoPlayer):
class SimpleMplayerSlaveModePlayer(SimpleMplayerPlayer):
def __init__(self, taskman: TaskManager):
super().__init__(taskman)
self._process: Optional[subprocess.Popen] = None
self.args.append("-slave")
def _play(self, filename: str) -> None:
def _play(self, tag: AVTag) -> None:
assert isinstance(tag, SoundOrVideoTag)
self._process = subprocess.Popen(
self.args + [filename], env=self.env, stdin=subprocess.PIPE
self.args + [tag.filename], env=self.env, stdin=subprocess.PIPE
)
while True:
try:
self._process.wait(0.1)
if self._process.returncode != 0:
print(f"player got return code: {self._process.returncode}")
return
except subprocess.TimeoutExpired:
pass
if self._terminate_flag:
self._process.terminate()
self._terminate_flag = False
raise PlayerInterrupted()
self._wait_for_termination()
def command(self, text: str) -> None:
"""Send a command over the slave interface.

View File

@ -1,67 +1,134 @@
"""
todo
Basic text to speech support.
Users can use the following in their card template:
{{tts en_US:Field}}
or
{{tts ja_JP voices=Kyoko,Otoya,Another_name:Field}}
The first argument must be a language code. If provided,
voices is a comma-separated list of one or more voices that
the user would prefer. Spaces must not be included.
Underscores will be converted to spaces.
"""
from __future__ import annotations
import re
import subprocess
import time
from concurrent.futures import Future
from typing import cast
from dataclasses import dataclass
from typing import List, Optional, cast
from anki.sound import AVTag, TTSTag
from aqt.sound import OnDoneCallback, Player, PlayerInterrupted
from aqt.sound import SimpleProcessPlayer
from aqt.taskman import TaskManager
class TTSPlayer(Player): # pylint: disable=abstract-method
def can_play(self, tag: AVTag) -> bool:
return isinstance(tag, TTSTag)
@dataclass
class TTSArgs:
# requested language
lang: str
# preferred voices, will use first available if possible
voices: List[str]
@classmethod
def from_string(cls, args: List[str]) -> TTSArgs:
voices: Optional[List[str]] = None
lang = args[0]
for arg in args[1:]:
try:
key, val = arg.split("=")
except ValueError:
continue
key = key.strip()
val = val.strip().replace("_", " ")
if key == "voices":
voices = val.split(",")
return TTSArgs(voices=voices or [], lang=lang)
class MacTTSPlayer(TTSPlayer):
# Mac support
##########################################################################
@dataclass
class MacVoice:
name: str
lang: str
VOICE_HELP_LINE_RE = re.compile(r"^(\S+)\s+(\S+)\s+.*$")
def parse_voice_line(line: str) -> Optional[MacVoice]:
m = VOICE_HELP_LINE_RE.match(line)
if not m:
return None
return MacVoice(name=m.group(1), lang=m.group(2))
class MacTTSPlayer(SimpleProcessPlayer):
def __init__(self, taskman: TaskManager):
self._taskman = taskman
self._terminate_flag = False
super().__init__(taskman)
self._available_voices: Optional[List[MacVoice]] = None
def play(self, tag: AVTag, on_done: OnDoneCallback) -> None:
def _play(self, tag: AVTag) -> None:
ttag = cast(TTSTag, tag)
self._taskman.run(
lambda: self._play(ttag), lambda ret: self._on_done(ret, on_done)
)
voice = self.voice_for_tag(ttag)
def _play(self, tag: TTSTag) -> None:
process = subprocess.Popen(
["say", "-v", "Alex", "-f", "-"],
self._process = subprocess.Popen(
["say", "-v", voice.name, "-f", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# write the input text to stdin
process.stdin.write(tag.text.encode("utf8"))
process.stdin.close()
# and wait for termination
while True:
try:
process.wait(0.1)
if process.returncode != 0:
print(f"player got return code: {process.returncode}")
return
except subprocess.TimeoutExpired:
pass
if self._terminate_flag:
process.terminate()
self._terminate_flag = False
raise PlayerInterrupted()
self._process.stdin.write(ttag.text.encode("utf8"))
self._process.stdin.close()
def _on_done(self, ret: Future, cb: OnDoneCallback) -> None:
try:
ret.result()
except PlayerInterrupted:
# don't fire done callback when interrupted
return
cb()
self._wait_for_termination()
def stop(self):
self._terminate_flag = True
# block until stopped
while self._terminate_flag:
time.sleep(0.1)
def rank_for_tag(self, tag: AVTag) -> Optional[int]:
if not isinstance(tag, TTSTag):
return None
# todo
return 0
def voices(self) -> List[MacVoice]:
if not self._available_voices:
cmd = subprocess.run(
["say", "-v", "?"], capture_output=True, check=True, encoding="utf8"
)
self._available_voices = []
for line in cmd.stdout.splitlines():
voice = parse_voice_line(line)
if voice:
self._available_voices.append(voice)
return self._available_voices
def voice_for_tag(self, tag: TTSTag) -> MacVoice:
args = TTSArgs.from_string(tag.args)
voices = self.voices()
# any requested voices match?
for requested_voice in args.voices:
avail_voice = next((x for x in voices if x.name == requested_voice), None)
if avail_voice:
return avail_voice
# requested language match?
avail_voice = next((x for x in voices if x.lang == args.lang), None)
if avail_voice:
return avail_voice
# fall back on first voice
return voices[0]

View File

@ -381,8 +381,8 @@ field</a>
#[test]
fn test_tts() {
assert_eq!(
tts_filter("tts lang=en_US", "foo"),
"[anki:tts][lang=en_US]foo[/anki:tts]"
tts_filter("tts en_US voices=Bob,Jane", "foo"),
"[anki:tts][en_US voices=Bob,Jane]foo[/anki:tts]"
);
}
}