52642d693b
I do not recall anyone reporting that it worked better than the Qt implementation for them, and the lack of recent wheels on PyPI is a pain. We can always add it back in the future if enough people come out of the woodwork to report they were using it.
99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
# Copyright: Ankitects Pty Ltd and contributors
|
|
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
|
|
|
# pylint: skip-file
|
|
|
|
"""
|
|
PyQt5-only code
|
|
"""
|
|
|
|
import wave
|
|
from concurrent.futures import Future
|
|
from typing import cast
|
|
|
|
import aqt
|
|
|
|
from .qt import *
|
|
from .sound import Recorder
|
|
from .utils import showWarning
|
|
|
|
|
|
class QtAudioInputRecorder(Recorder):
|
|
def __init__(self, output_path: str, mw: aqt.AnkiQt, parent: QWidget) -> None:
|
|
super().__init__(output_path)
|
|
|
|
self.mw = mw
|
|
self._parent = parent
|
|
|
|
from PyQt5.QtMultimedia import ( # type: ignore
|
|
QAudioDeviceInfo,
|
|
QAudioFormat,
|
|
QAudioInput,
|
|
)
|
|
|
|
format = QAudioFormat()
|
|
format.setChannelCount(1)
|
|
format.setSampleRate(44100)
|
|
format.setSampleSize(16)
|
|
format.setCodec("audio/pcm")
|
|
format.setByteOrder(QAudioFormat.LittleEndian)
|
|
format.setSampleType(QAudioFormat.SignedInt)
|
|
|
|
device = QAudioDeviceInfo.defaultInputDevice()
|
|
if not device.isFormatSupported(format):
|
|
format = device.nearestFormat(format)
|
|
print("format changed")
|
|
print("channels", format.channelCount())
|
|
print("rate", format.sampleRate())
|
|
print("size", format.sampleSize())
|
|
self._format = format
|
|
|
|
self._audio_input = QAudioInput(device, format, parent)
|
|
|
|
def start(self, on_done: Callable[[], None]) -> None:
|
|
self._iodevice = self._audio_input.start()
|
|
self._buffer = bytearray()
|
|
qconnect(self._iodevice.readyRead, self._on_read_ready)
|
|
super().start(on_done)
|
|
|
|
def _on_read_ready(self) -> None:
|
|
self._buffer.extend(cast(bytes, self._iodevice.readAll()))
|
|
|
|
def stop(self, on_done: Callable[[str], None]) -> None:
|
|
def on_stop_timer() -> None:
|
|
# read anything remaining in buffer & stop
|
|
self._on_read_ready()
|
|
self._audio_input.stop()
|
|
|
|
if err := self._audio_input.error():
|
|
showWarning(f"recording failed: {err}")
|
|
return
|
|
|
|
def write_file() -> None:
|
|
# swallow the first 300ms to allow audio device to quiesce
|
|
wait = int(44100 * self.STARTUP_DELAY)
|
|
if len(self._buffer) <= wait:
|
|
return
|
|
self._buffer = self._buffer[wait:]
|
|
|
|
# write out the wave file
|
|
wf = wave.open(self.output_path, "wb")
|
|
wf.setnchannels(self._format.channelCount())
|
|
wf.setsampwidth(self._format.sampleSize() // 8)
|
|
wf.setframerate(self._format.sampleRate())
|
|
wf.writeframes(self._buffer)
|
|
wf.close()
|
|
|
|
def and_then(fut: Future) -> None:
|
|
fut.result()
|
|
Recorder.stop(self, on_done)
|
|
|
|
self.mw.taskman.run_in_background(write_file, and_then)
|
|
|
|
# schedule the stop for half a second in the future,
|
|
# to avoid truncating the end of the recording
|
|
self._stop_timer = t = QTimer(self._parent)
|
|
t.timeout.connect(on_stop_timer) # type: ignore
|
|
t.setSingleShot(True)
|
|
t.start(500)
|