4d0a915610
New API required for PyQt6.
99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
# Copyright: Ankitects Pty Ltd and contributors
|
|
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
|
|
|
"""
|
|
PyQt5-only code
|
|
"""
|
|
|
|
import wave
|
|
from concurrent.futures import Future
|
|
from typing import cast
|
|
|
|
import aqt
|
|
|
|
from .qt import *
|
|
from .sound import Recorder
|
|
from .utils import showWarning
|
|
|
|
|
|
class QtAudioInputRecorder(Recorder):
|
|
def __init__(self, output_path: str, mw: aqt.AnkiQt, parent: QWidget) -> None:
|
|
super().__init__(output_path)
|
|
|
|
self.mw = mw
|
|
self._parent = parent
|
|
|
|
from PyQt5.QtMultimedia import QAudioDeviceInfo, QAudioFormat, QAudioInput
|
|
|
|
format = QAudioFormat()
|
|
format.setChannelCount(1)
|
|
format.setSampleRate(44100)
|
|
format.setSampleSize(16)
|
|
format.setCodec("audio/pcm")
|
|
format.setByteOrder(QAudioFormat.LittleEndian)
|
|
format.setSampleType(QAudioFormat.SignedInt)
|
|
|
|
device = QAudioDeviceInfo.defaultInputDevice()
|
|
if not device.isFormatSupported(format):
|
|
format = device.nearestFormat(format)
|
|
print("format changed")
|
|
print("channels", format.channelCount())
|
|
print("rate", format.sampleRate())
|
|
print("size", format.sampleSize())
|
|
self._format = format
|
|
|
|
self._audio_input = QAudioInput(device, format, parent)
|
|
|
|
def start(self, on_done: Callable[[], None]) -> None:
|
|
self._iodevice = self._audio_input.start()
|
|
self._buffer = b""
|
|
self._iodevice.readyRead.connect(self._on_read_ready) # type: ignore
|
|
super().start(on_done)
|
|
|
|
def _on_read_ready(self) -> None:
|
|
self._buffer += cast(bytes, self._iodevice.readAll())
|
|
|
|
def stop(self, on_done: Callable[[str], None]) -> None:
|
|
def on_stop_timer() -> None:
|
|
# read anything remaining in buffer & stop
|
|
self._on_read_ready()
|
|
self._audio_input.stop()
|
|
|
|
if err := self._audio_input.error():
|
|
showWarning(f"recording failed: {err}")
|
|
return
|
|
|
|
def write_file() -> None:
|
|
# swallow the first 300ms to allow audio device to quiesce
|
|
wait = int(44100 * self.STARTUP_DELAY)
|
|
if len(self._buffer) <= wait:
|
|
return
|
|
self._buffer = self._buffer[wait:]
|
|
|
|
# write out the wave file
|
|
wf = wave.open(self.output_path, "wb")
|
|
wf.setnchannels(self._format.channelCount())
|
|
wf.setsampwidth(self._format.sampleSize() // 8)
|
|
wf.setframerate(self._format.sampleRate())
|
|
wf.writeframes(self._buffer)
|
|
wf.close()
|
|
|
|
def and_then(fut: Future) -> None:
|
|
fut.result()
|
|
Recorder.stop(self, on_done)
|
|
|
|
self.mw.taskman.run_in_background(write_file, and_then)
|
|
|
|
# schedule the stop for half a second in the future,
|
|
# to avoid truncating the end of the recording
|
|
self._stop_timer = t = QTimer(self._parent)
|
|
t.timeout.connect(on_stop_timer) # type: ignore
|
|
t.setSingleShot(True)
|
|
t.start(500)
|
|
|
|
|
|
def prompt_for_mic_permission() -> None:
|
|
from PyQt5.QtMultimedia import QAudioDeviceInfo
|
|
|
|
QAudioDeviceInfo.defaultInputDevice()
|