This commit is contained in:
Daniel Langbein 2023-11-09 14:21:44 +01:00
commit 444b527dc4
Signed by: langfingaz
GPG Key ID: 6C47C753F0823002
23 changed files with 1479 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@ -0,0 +1,12 @@
/__pycache__/
/.flatpak-builder
/fp_build
/dist/
/.venv/
/.idea/
/.git/
/data
/src/nextcast/data/
/*.zip

View File

@ -0,0 +1,35 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="app_play_episode" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="youplay" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/venv/bin/python3" />
<option name="SDK_NAME" value="Python 3.10 (nextcast)" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/src/nextcast" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/nextcast/app_play_episode.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,35 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="app_resume_playback" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="youplay" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/venv/bin/python3" />
<option name="SDK_NAME" value="Python 3.10 (nextcast)" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/src/nextcast" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/nextcast/app_resume_playback.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

35
.run/nextcloud.run.xml Normal file
View File

@ -0,0 +1,35 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="nextcloud" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="youplay" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/venv/bin/python3" />
<option name="SDK_NAME" value="Python 3.10 (nextcast)" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/src/nextcast" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/nextcast/nextcloud.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"nixEnvSelector.suggestion": false
}

26
README.md Normal file
View File

@ -0,0 +1,26 @@
# NextCast
With `nix-shell`:
```shell
export PYTHONPATH="$${PYTHONPATH:-}:src"
nix-shell --run './src/nextcast/app_resume_playback.py'
nix-shell --run './src/nextcast/app_play_episode.py'
```
With `apk` (on Alpine or PostmarketOS):
```shell
sudo apk add mpv libmediainfo ncurses
sudo apk add py3-pip
python3 -m venv venv
. venv/bin/activate
pip install -r requirements.txt
```
```shell
. venv/bin/activate
export PYTHONPATH="$${PYTHONPATH:-}:src"
./src/nextcast/app_resume_playback.py
./src/nextcast/app_play_episode.py
```

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
requests
simple-term-menu
pymediainfo
feedparser
# System packages:
# mpv
# libmediainfo
# ncurses (provides tput binary required by simple-term-menu)

39
shell.nix Normal file
View File

@ -0,0 +1,39 @@
{
# nixpkgs stable branch
pkgs ? import <nixpkgs> {},
# nixpkgs unstable branch, 2023-10-31
unstable ? import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/90e85bc7c1a6fc0760a94ace129d3a1c61c3d035.tar.gz") {}
}:
let
my_python_pkgs = ps: with ps; [
ps.requests
ps.pymediainfo
ps.feedparser
(
buildPythonPackage rec {
pname = "simple-term-menu";
# Latest version as of 2023-11-02
version = "1.6.1";
src = fetchPypi {
inherit pname version;
sha256 = "sha256-NotBWNF0m4aFUvtsBUuDAXhQhscaclPayEBMw8stMOg=";
};
propagatedBuildInputs = [];
}
)
# Build dependencies
ps.build
ps.twine
# IDE
# https://github.com/NixOS/nix-idea/issues/1#issuecomment-922555354
ps.pip
];
in
pkgs.mkShell {
nativeBuildInputs = [
pkgs.buildPackages.mpv
];
packages = [
(pkgs.python3.withPackages my_python_pkgs)
];
}

0
src/nextcast/__init__.py Normal file
View File

105
src/nextcast/app_play_episode.py Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Callable
from nextcast import mpv
from nextcast.cli_gui import list_selection, CancelButton, TopButton
from nextcast.log import Log
from nextcast.podcast import User, Episode, Podcast
from nextcast.user_manager import UserManager
def main():
# The terminal GUI hides possible error warnings.
# Pause to make them visible
Log.config(pause_on_error=True)
login_loop()
def login_loop() -> None:
# Select user account.
user = UserManager.from_interaction()
podcast_loop(login_loop, user)
def podcast_loop(prev: Callable, user: User) -> None:
# Select one podcast.
back_str = '← BACK'
try:
podcast = list_selection(
elements=user.get_podcasts(),
display_function=lambda x: x.title,
title='Select podcast',
top_buttons=[back_str],
)
except CancelButton as _e:
exit()
except TopButton as e:
if e.value == back_str:
return prev()
else:
raise NotImplementedError()
episode_loop(lambda: podcast_loop(prev, user), podcast)
def episode_loop(prev: Callable, podcast: Podcast) -> None:
def display_function(ep: Episode) -> str:
s = ''
# Download icon.
if ep.downloaded():
s += '💾'
else:
s += ' '
# Listening progress.
if ep.played():
s += ''
else:
position = ep.get_position()
if position > 0:
progress = ep.get_position() / ep.get_duration()
s += f'{progress:.0%} '
else:
s += ' '
# Title.
s += ep.title
return s
# Select one episode.
back_str = '← BACK'
try:
episode = list_selection(
elements=podcast.episodes,
display_function=display_function,
title='Select podcast',
top_buttons=[back_str],
)
except CancelButton as _e:
exit()
except TopButton as e:
if e.value == back_str:
return prev()
else:
raise NotImplementedError()
# Play and exit.
# play_loop(exit, episode)
# Play and return to menu.
play_loop(lambda: episode_loop(prev, podcast), episode)
def play_loop(prev: Callable, episode: Episode) -> None:
mpv.play_episode(episode)
prev()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,42 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from nextcast import mpv
from nextcast.cli_gui import list_selection, CancelButton
from nextcast.log import Log
from nextcast.user_manager import UserManager
def main():
# The terminal GUI hides possible error warnings.
# Pause to make them visible
Log.config(pause_on_error=True)
user = UserManager.from_interaction()
episodes = user.get_episodes()
# Paused episodes.
pauseds = [episode for episode in episodes
if episode.paused()]
if len(pauseds) == 0:
Log.info(f'There are no paused episodes.')
return
# Sort paused episodes by seconds played.
pauseds = sorted(pauseds, key=lambda paused: paused.get_position(), reverse=True)
# Select paused episode.
try:
episode = list_selection(
elements=pauseds,
display_function=lambda x: x.title,
title='Select episode to resume',
)
except CancelButton as _e:
exit()
mpv.play_episode(episode)
if __name__ == '__main__':
main()

47
src/nextcast/cli_gui.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import TypeVar, Callable
from simple_term_menu import TerminalMenu
F = TypeVar('F')
class CancelButton(Exception):
pass
class TopButton(Exception):
def __init__(self, value: str):
self.value = value
def list_selection(elements: list[F], display_function: Callable[[F], str],
title: str = None,
top_buttons: list[str] = None) -> F:
"""
:raises CancelButton:
:raises TopButton:
"""
if top_buttons is None:
top_buttons = []
list_options = [display_function(element) for element in elements]
# Add top_buttons first:
for top_button in top_buttons:
assert top_button not in list_options
list_options = top_buttons + list_options
# Show list and wait for user selection.
terminal_menu = TerminalMenu(list_options, title=title)
index = terminal_menu.show()
if index is None:
raise CancelButton()
if index < len(top_buttons):
selected_top_button = top_buttons[index]
raise TopButton(selected_top_button)
else:
index -= len(top_buttons)
selected = elements[index]
return selected

31
src/nextcast/data_dir.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
def get_audio_dir() -> Path:
return Path('data') / 'audio'
def get_feed_dir() -> Path:
return Path('data') / 'feed'
def get_podcast_dir() -> Path:
return Path('data') / 'podcast'
def get_mpv_watch_later_dir() -> Path:
return Path('data') / 'watch_later'
def get_timestamp_file():
return Path('data') / 'timestamps.json'
def get_subscription_file():
return Path('data') / 'subscriptions.json'
def get_credential_file():
return Path('data') / 'nc_logins.json'

36
src/nextcast/datetime_util.py Executable file
View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime, timezone
def now() -> datetime:
return datetime.now(timezone.utc)
def now_str() -> str:
return to_str(now())
def now_timestamp() -> int:
return to_timestamp(now())
def to_timestamp(dt: datetime) -> int:
return round(datetime.timestamp(dt)) * 1000
def from_timestamp(timestamp: int) -> datetime:
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
def to_str(dt: datetime) -> str:
return dt.strftime(fmt())
def from_str(dt_str: str) -> datetime:
dt = datetime.strptime(dt_str, fmt())
return dt.replace(tzinfo=timezone.utc)
def fmt() -> str:
return '%Y-%m-%dT%H:%M:%S'

68
src/nextcast/feed.py Normal file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import feedparser
from feedparser.util import FeedParserDict
from nextcast.file_cache import get_feed_file
from nextcast.log import Log
def test():
feed_url = 'https://logbuch-netzpolitik.de/feed/opus'
feed = get_feed(feed_url)
title = get_feed_title(feed)
episodes = get_feed_episodes(feed)
Log.info(f'The podcast "{title}" has {len(episodes)} episodes.')
def get_episode_media_url(episode: FeedParserDict) -> str:
# audio/*, video/* MIME types: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types
media_urls = [link['href'] for link in episode['links']
if link['type'].split('/')[0] in ['audio', 'video']]
assert len(media_urls) == 1, f'Expected one audio or video url, but got: {media_urls}'
return media_urls[0]
def get_feed_episodes(feed: FeedParserDict) -> list[FeedParserDict]:
episodes = feed['entries']
# Example of one episode from episodes:
#
# episode['title'] = 'LNP474 Autobahnkreuzigung'
# episode['itunes_duration'] = '2:01:07'
# episode['itunes_episode'] = '474'
# episode['psc_chapters']['chapters'] = [
# {'start_parsed': ..., 'title': ...}
# ]
# episode['id'] = 'podlove-2023-11-03t15:44:34+00:00-b33bff874b85052'
# episode['links'] = [
# {
# 'href': 'https://logbuch-netzpolitik.de/podlove/file/8623/s/feed/c/opus/lnp474-autobahnkreuzigung.opus',
# 'type': 'audio/opus'
# }
# ]
return episodes
def get_feed_title(feed: FeedParserDict) -> str:
title = feed['feed']['title']
# Example of other fields:
#
# subtitle = feed['feed']['subtitle']
# summary = feed['feed']['summary']
# image = feed['feed']['image']
return title
def get_feed(url: str) -> FeedParserDict:
file = get_feed_file(url)
Log.info(f'Parsing feed {url} ...')
feed = feedparser.parse(file)
return feed
if __name__ == '__main__':
test()

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hashlib import sha256
from pathlib import Path
import requests
from nextcast.data_dir import get_audio_dir, get_feed_dir, get_podcast_dir
from nextcast.log import Log
def test():
url = 'https://ukw.fm/podlove/file/634/s/feed/c/opus/ukw114-ukraine-abnutzungskrieg.opus'
audio = CachedAudio(file_url=url)
Log.info(audio.get())
class CachedFile:
def __init__(self, file_url: str, cache_dir: Path):
self.file_url = file_url
self.cache_dir = cache_dir
def get(self) -> Path:
"""
:return: Path to downloaded file.
"""
return cache(file_url=self.file_url, cache_dir=self.cache_dir)
def get_path(self) -> Path:
"""
:return: Path to local file without downloading it.
"""
return hashed_location(self.file_url, self.cache_dir)
def downloaded(self) -> bool:
return self.get_path().is_file()
class CachedAudio(CachedFile):
def __init__(self, file_url: str):
super().__init__(file_url, get_audio_dir())
def get_podcast_file(file_url: str) -> Path:
return hashed_location(file_url, get_podcast_dir(), suffix='.json')
def get_feed_file(file_url: str) -> Path:
return cache(file_url, get_feed_dir())
def cache(file_url: str, cache_dir: Path) -> Path:
"""
Downloads the file described by `url` if it does not exist locally.
Returns path to local file.
"""
file = hashed_location(file_url, cache_dir)
# If the file is missing locally, it is downloaded.
if not file.is_file():
Log.info(f'Downloading {file_url} ...')
r = requests.get(file_url)
file.parent.mkdir(parents=True, exist_ok=True)
Log.info(f'Saving {file_url} as file {file} ...')
with file.open(mode='wb') as f:
f.write(r.content)
# Return path to local file.
return file
def hashed_location(url: str, data_dir: Path, suffix: str = ''):
sha256_hash = sha256(url.encode())
hash_str = sha256_hash.hexdigest().upper()
file = data_dir / f'{hash_str}{suffix}'
return file
if __name__ == '__main__':
test()

81
src/nextcast/json_util.py Normal file
View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from pathlib import Path
from typing import Any
from nextcast.data_dir import get_subscription_file, get_credential_file, get_timestamp_file
from nextcast.file_cache import get_podcast_file
def test():
file = get_podcast_file('https://logbuch-netzpolitik.de/feed/opus')
# Write once.
file.write_text('{}')
# Overwrite with new data.
file.write_text('{}')
def read_timestamp() -> dict:
file = get_timestamp_file()
return read_json_file(file)
def write_timestamp(content: dict) -> None:
file = get_timestamp_file()
write_json_file(file, content)
def read_podcast(url: str) -> dict:
file = get_podcast_file(url)
return read_json_file(file)
def write_podcast(content: dict, url: str) -> None:
file = get_podcast_file(url)
write_json_file(file, content)
def read_subscriptions() -> dict:
"""
:return: Content of subscriptions file.
"""
file = get_subscription_file()
return read_json_file(file)
def write_subscriptions(content: dict) -> None:
file = get_subscription_file()
write_json_file(file, content)
def read_credentials() -> dict[str, Any]:
"""
:return: Content of credentials file.
"""
file = get_credential_file()
return read_json_file(file)
def write_credentials(content: dict[str, Any]) -> None:
file = get_credential_file()
write_json_file(file, content)
def read_json_file(file: Path) -> dict:
if not file.is_file():
content = {}
else:
content = json.loads(file.read_text())
return content
def write_json_file(file: Path, content: dict) -> None:
if not file.is_file():
file.parent.mkdir(parents=True, exist_ok=True)
file.touch(mode=0o600)
file.write_text(json.dumps(content))
if __name__ == '__main__':
test()

19
src/nextcast/log.py Normal file
View File

@ -0,0 +1,19 @@
import sys
class Log:
_pause_on_error = False
@classmethod
def config(cls, pause_on_error: bool = False):
cls._pause_on_error = pause_on_error
@classmethod
def info(cls, message: object):
print(message)
@classmethod
def error(cls, message: object):
print(message, file=sys.stderr)
if cls._pause_on_error:
input('Press ENTER to continue.')

28
src/nextcast/metadata.py Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
from pymediainfo import MediaInfo
from nextcast.log import Log
def test():
audio = Path('data/0D544A3E2F3E21E92979539FAD78901B1376D64C631DB41B6D71FAF8EE9CE88B')
duration_ms = get_duration_ms(audio)
Log.info(duration_ms)
def get_duration_ms(audio: Path) -> int:
mi: MediaInfo = MediaInfo.parse(audio)
audio_tracks = [t for t in mi.tracks if t.track_type == 'Audio']
if len(audio_tracks) != 1:
raise ValueError(f'Expected one audio track, but got {len(audio_tracks)}')
audio_track = audio_tracks[0]
duration_ms = audio_track.to_data()['duration']
return duration_ms
if __name__ == '__main__':
test()

127
src/nextcast/mpv.py Normal file
View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import hashlib
import math
import subprocess
from pathlib import Path
from nextcast.data_dir import get_mpv_watch_later_dir
from nextcast.file_cache import CachedAudio
from nextcast.log import Log
from nextcast.podcast import Episode
def test():
song = Path('/home/yoda/Music/youplay/Eminem - Without Me (Official Music Video).mp3')
position = play(song, position=10)
Log.info(f'Played until {position}s')
url = 'https://ukw.fm/podlove/file/634/s/feed/c/opus/ukw114-ukraine-abnutzungskrieg.opus'
position = play_url(url, position=240)
if position is None:
Log.info('Played until end')
else:
Log.info(f'Played until {position}s')
def play_episode(episode: Episode, audio_display: bool = False):
"""
Play media file and upload new position.
"""
# First get audio file.
# This might fix the duration and playback position.
audio = episode.get_file()
if episode.played():
# Start over from the beginning.
position = 0
else:
position = episode.get_position()
# Play podcast.
new_position = play(audio=audio, position=position, audio_display=audio_display)
if new_position is None:
Log.info(f'Finished podcast.')
new_position = episode.get_duration()
else:
Log.info(f'Listened until {new_position}')
# Save and upload.
episode.update_position(new_position)
def play_url(url: str, position: int | float = 0, audio_display: bool = False) -> int | None:
"""
:param audio_display:
:param url:
:param position: In seconds.
:return: Position in seconds.
"""
audio = CachedAudio(file_url=url)
return play(audio.get(), position, audio_display)
def play(audio: Path, position: int | float = 0, audio_display: bool = False) -> int | None:
"""
:param audio_display:
:param audio:
:param position: In seconds.
:return: Position in seconds.
"""
command = ['mpv']
if not audio_display:
command.append('--no-audio-display')
command += [f'--watch-later-directory={get_mpv_watch_later_dir().absolute()}',
'--save-position-on-quit',
f'--start={position}',
'--',
str(audio.absolute())]
subprocess.run(
args=command,
text=True,
check=True,
)
return get_mpv_saved_position(audio)
def get_mpv_saved_position(audio: Path) -> int | None:
"""
If a file has been listened until its end, this function returns `None`.
:return: Playback position of audio files that have been started but not finished.
"""
watch_later_file = get_mpv_watch_later_file(audio)
# If the audio file has not yet been played with mpv,
# then the corresponding watch_later file does not exist.
# In this case, we return the start of the audio file: `0`
if not watch_later_file.is_file():
return None
# Example value of `content`:
# start=43.21
# pause=yes
content = watch_later_file.read_text()
for line in content.splitlines():
prefix = 'start='
if line.startswith(prefix):
position_str = line[len(prefix):]
break
else:
raise ValueError()
# Convert to int while rounding floor.
return math.floor(float(position_str))
def get_mpv_watch_later_file(audio: Path) -> Path:
md5_hash = hashlib.md5(str(audio.absolute()).encode())
hash_str = md5_hash.hexdigest().upper()
return get_mpv_watch_later_dir() / hash_str
if __name__ == '__main__':
test()

220
src/nextcast/nextcloud.py Normal file
View File

@ -0,0 +1,220 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import time
import webbrowser
from datetime import datetime, timedelta
from enum import Enum
import requests
from nextcast import datetime_util
from nextcast.json_util import read_timestamp, write_timestamp
from nextcast.log import Log
def test():
domain = 'c.p1st.de'
NextcloudApi.login(domain)
class EpisodeActionType(Enum):
DOWNLOAD = 'download'
DELETE = 'delete'
PLAY = 'play'
NEW = 'new'
FLATTR = 'flattr'
class EpisodeAction:
@classmethod
def from_json(cls, j: dict):
return cls(j['podcast'],
j['episode'],
j['guid'],
EpisodeActionType(j['action'].lower()),
datetime_util.from_str(j['timestamp']),
j['started'],
j['position'],
j['total'])
def to_json(self) -> dict[str, str]:
return {
'podcast': self.podcast,
'episode': self.episode,
'guid': self.guid,
'action': self.action.value.upper(),
'timestamp': datetime_util.to_str(self.timestamp),
'started': self.started,
'position': self.position,
'total': self.total,
}
def __init__(self,
podcast: str,
episode: str,
guid: str,
action: EpisodeActionType,
timestamp: datetime,
started: int,
position: int,
total: int):
# Example: 'https://ukw.fm/feed/opus/'
self.podcast = podcast
# Example: 'https://ukw.fm/podlove/file/634/s/feed/c/opus/ukw114-ukraine-abnutzungskrieg.opus'
self.episode = episode
# Example: 'podlove-2023-10-06t22:31:54+00:00-5a5ef55ba0e6cd5'
self.guid = guid
# Example: play
self.action = action
# Example: '2023-11-01T12:44:04'
self.timestamp = timestamp
# Only valid for "play".
# The position (in seconds) at which the client started playback.
# Requires position and total to be set.
# Example: 11692
self.started = started
# Only valid for “play”.
# The position (in seconds) at which the client stopped playback.
# Example: 13379
self._position = position
# Example: 13420
self.total = total
assert total >= position, f'total {total}s ({timedelta(seconds=total)}) !>=' \
f' position {position}s ({timedelta(seconds=position)})'
def played(self) -> bool:
return 0 < self.position == self.total
def paused(self, min_position: int = 0) -> bool:
"""
:param min_position: Only consider paused if listening progress is greater than `min_position` seconds.
"""
return min_position < self.position < self.total
@property
def episode_id(self) -> str:
return f'{self.guid}@{self.podcast}'
@property
def position(self) -> int:
assert self.action == EpisodeActionType.PLAY
return self._position
@position.setter
def position(self, value: int):
self._position = value
class NextcloudApi:
@classmethod
def login(cls, domain: str) -> tuple[str, str, str]:
"""
Login flow documentation:
https://docs.nextcloud.com/server/latest/developer_manual/client_apis/LoginFlow/index.html
"""
r = requests.post(f'https://{domain}/index.php/login/v2')
assert r.status_code == 200
content = json.loads(r.text)
webbrowser.open(content['login'], new=0, autoraise=True)
headers = {'Content-type': 'application/x-www-form-urlencoded'}
while True:
endpoint = content['poll']['endpoint']
token = content['poll']['token']
r = requests.post(endpoint, headers=headers, data=f'token={token}')
if r.status_code == 200:
break
elif r.status_code == 404:
time.sleep(1)
continue
else:
Log.info(f'curl -X POST {endpoint} -d "token={token}"')
raise ValueError(f'Invalid status code: {r.status_code}')
content = json.loads(r.text)
server = content['server']
login_name = content['loginName']
app_password = content['appPassword']
return server, login_name, app_password
@classmethod
def fetch_subscriptions(cls, server: str, login_name: str, app_password: str) -> list[str]:
"""
https://github.com/thrillfall/nextcloud-gpodder/blob/main/README.md#subscription
:return: The URLs of the subscribed podcast feeds. Example: 'https://logbuch-netzpolitik.de/feed/opus'
"""
url = f'{server}/apps/gpoddersync/subscriptions'
Log.info(f'Downloading {url} ...')
# auth parameter: https://requests.readthedocs.io/en/latest/user/authentication/#basic-authentication
r = requests.get(url, auth=(login_name, app_password))
assert r.status_code == 200
content = json.loads(r.text)
return content['add']
@classmethod
def fetch_play_actions(cls, server: str, login_name: str, app_password: str, user_id: str):
"""
:return: Return new episode actions since last function call where EpisodeActionType is PLAY.
"""
timestamp_dict = read_timestamp()
timestamp = timestamp_dict.get(user_id, 0)
episode_actions = NextcloudApi.fetch_episode_actions(
server=server,
login_name=login_name,
app_password=app_password,
types=[EpisodeActionType.PLAY],
since=timestamp)
timestamp = datetime_util.now_timestamp()
timestamp_dict[user_id] = timestamp
write_timestamp(timestamp_dict)
return episode_actions
@classmethod
def fetch_episode_actions(cls,
server: str,
login_name: str,
app_password: str,
types: list[EpisodeActionType] = None,
since: int = 0) -> list[EpisodeAction]:
"""
https://github.com/thrillfall/nextcloud-gpodder/blob/main/README.md#episode-action
"""
if types is None:
types = [ea_type for ea_type in EpisodeActionType]
url = f'{server}/apps/gpoddersync/episode_action?since={since}'
Log.info(f'Downloading {url} ...')
# auth parameter: https://requests.readthedocs.io/en/latest/user/authentication/#basic-authentication
r = requests.get(url, auth=(login_name, app_password))
assert r.status_code == 200
content = json.loads(r.text)
_timestamp = datetime_util.from_timestamp(content['timestamp'])
episode_actions = [EpisodeAction.from_json(action) for action in content['actions']]
return [episode_action for episode_action in episode_actions
if episode_action.action in types]
@classmethod
def push_episode_actions(cls, server: str, login_name: str, app_password: str,
actions: list[EpisodeAction]) -> None:
data = [action.to_json() for action in actions]
url = f'{server}/apps/gpoddersync/episode_action/create'
Log.info(f'Uploading {url} ...')
data_str = json.dumps(data)
headers = {'Content-type': 'application/json'}
r = requests.post(url, auth=(login_name, app_password),
headers=headers, data=data_str)
assert r.status_code == 200
if __name__ == '__main__':
test()

315
src/nextcast/podcast.py Normal file
View File

@ -0,0 +1,315 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import math
from datetime import datetime
from pathlib import Path
import requests
from feedparser.util import FeedParserDict
from nextcast import feed, datetime_util, json_util, metadata
from nextcast.file_cache import CachedAudio
from nextcast.log import Log
from nextcast.nextcloud import NextcloudApi, EpisodeActionType, EpisodeAction
class PlaybackActivity:
@classmethod
def from_json(cls, content: dict) -> PlaybackActivity:
return PlaybackActivity(
user_id=content['user_id'],
timestamp=datetime_util.from_str(content['timestamp']),
position=content['position'],
)
def __init__(self, user_id: str, position: int = 0, timestamp: datetime = None):
if timestamp is None:
timestamp = datetime_util.now()
self.user_id = user_id
self.timestamp = timestamp
self.position = position
def to_json(self) -> dict:
return {
'user_id': self.user_id,
'timestamp': datetime_util.to_str(self.timestamp),
'position': self.position,
}
class User:
"""
A logged in Nextcloud user.
"""
@classmethod
def from_id(cls, user_id: str, app_password: str):
last_index = user_id.rfind('@')
login_name = user_id[0:last_index]
server = user_id[last_index + 1:]
return User(server, login_name, app_password)
def __init__(self, server: str, login_name: str, app_password: str):
self.server = server
self.login_name = login_name
self.app_password = app_password
self._podcasts: list[Podcast] | None = None
@property
def user_id(self) -> str:
return f'{self.login_name}@{self.server}'
def get_podcasts(self) -> list[Podcast]:
if self._podcasts is None:
self._podcasts = [Podcast(url, self) for url in self.get_subscriptions()]
try:
episode_actions = NextcloudApi.fetch_play_actions(
self.server, self.login_name, self.app_password, self.user_id)
if len(episode_actions) > 0:
print(f'Applying {len(episode_actions)} new actions ...')
episodes_by_id: dict[str, Episode] = {e.episode_id: e
for p in self._podcasts
for e in p.episodes}
for episode_action in episode_actions:
if episode_action.episode_id in episodes_by_id:
episode = episodes_by_id[episode_action.episode_id]
activity = PlaybackActivity(self.user_id, episode_action.position, episode_action.timestamp)
episode.set_activity(activity)
episode.set_duration(episode_action.total)
episode.write()
except requests.exceptions.ConnectionError as _e:
Log.error(f'Could not fetch new actions.')
return self._podcasts
def get_episodes(self) -> list[Episode]:
return [episode
for podcast in self.get_podcasts()
for episode in podcast.episodes]
def get_subscriptions(self, update: bool = False) -> list[str]:
"""
:param update: If true, the local data is updated. TODO
:return: The URLs of the subscribed podcast feeds. Example: 'https://logbuch-netzpolitik.de/feed/opus'
"""
subscriptions = json_util.read_subscriptions()
if update or self.user_id not in subscriptions:
user_subscriptions = NextcloudApi.fetch_subscriptions(self.server, self.login_name, self.app_password)
subscriptions[self.user_id] = user_subscriptions
json_util.write_subscriptions(subscriptions)
else:
user_subscriptions = subscriptions[self.user_id]
return user_subscriptions
class Podcast:
def __init__(self, url: str, user: User):
self.user = user
self.url = url
self._feed: None | FeedParserDict = None
content = json_util.read_podcast(url)
if len(content) > 0:
# Read from JSON.
self.title = content['title']
self.episodes = [Episode.from_json(self, e) for e in content['episodes']]
else:
# Parse data.
self.title = feed.get_feed_title(self.get_feed())
self.episodes = [Episode.from_feed_parser(self, episode_parser)
for episode_parser in feed.get_feed_episodes(self.get_feed())]
# And save to JSON.
self.write()
def write(self) -> None:
json_util.write_podcast(self.to_json(), self.url)
def to_json(self) -> dict:
return {
'title': self.title,
'episodes': [episode.to_json() for episode in self.episodes]
}
def get_feed(self) -> FeedParserDict:
if self._feed is None:
self._feed = feed.get_feed(self.url)
return self._feed
class Episode:
@classmethod
def from_feed_parser(cls, podcast: Podcast, episode_parser: FeedParserDict) -> Episode:
return Episode(
podcast=podcast,
url=feed.get_episode_media_url(episode_parser),
guid=episode_parser['id'],
title=episode_parser['title'],
activities=[], # PlaybackActivities are assigned by User.get_podcasts().
reported_duration=None,
actual_duration=None,
)
@classmethod
def from_json(cls, podcast: Podcast, content: dict) -> Episode:
return Episode(
podcast=podcast,
url=content['url'],
guid=content['guid'],
title=content['title'],
activities=[PlaybackActivity.from_json(a) for a in content.get('activities', [])],
reported_duration=content.get('reported_duration', None),
actual_duration=content.get('actual_duration', None),
)
def __init__(self, podcast: Podcast, url: str, guid: str, title: str,
activities: list[PlaybackActivity], reported_duration: int | None, actual_duration: int | None):
self.podcast = podcast
self.url = url
self.guid = guid
self.title = title
self._activities = activities
self._reported_duration: int | None = reported_duration
self._actual_duration: int | None = actual_duration
self._file = CachedAudio(file_url=self.url)
def to_json(self) -> dict:
d = {
'url': self.url,
'guid': self.guid,
'title': self.title,
}
if len(self._activities) > 0:
d['activities'] = [activity.to_json() for activity in self._activities]
if self._reported_duration is not None:
d['reported_duration'] = self._reported_duration
if self._actual_duration is not None:
d['actual_duration'] = self._actual_duration
return d
@property
def episode_id(self) -> str:
return f'{self.guid}@{self.podcast.url}'
def downloaded(self) -> bool:
"""
:return: Weather the audio file has been downloaded.
"""
return self._file.downloaded()
def played(self) -> bool:
# This check does not require knowing the episode length.
if self.get_position() == 0:
return False
epsilon = 1
return self.get_duration() - epsilon <= self.get_position() <= self.get_duration() + epsilon
def paused(self, min_position: int = 1) -> bool:
"""
:param min_position: Only consider paused if listening progress is greater than `min_position` seconds.
"""
# This check does not require knowing the episode length.
if self.get_position() == 0:
return False
epsilon = 1
return min_position < self.get_position() < (self.get_duration() - epsilon)
def get_file(self) -> Path:
self._get_actual_duration()
return self._file.get()
def get_duration(self) -> int:
if self._reported_duration is not None:
return self._reported_duration
return self._get_actual_duration()
def _get_actual_duration(self) -> int:
"""
Get actual audio duration and fix reported duration and position.
"""
if self._actual_duration is None:
self._actual_duration = math.ceil(metadata.get_duration_ms(self._file.get()) / 1000)
if self._reported_duration is not None and self._reported_duration != self._actual_duration:
# Fix reported_duration and reported_position.
Log.error(f'Duration mismatch: Reported {self._reported_duration} vs actual {self._actual_duration}')
# reported_duration is only set if an activity was downloaded.
# Such an activity contains a "reported" position and a "reported" duration.
reported_position = self.get_position()
actual_position = math.floor(reported_position * self._actual_duration / self._reported_duration)
Log.error(f'Calculated positions: Reported {reported_position} vs actual {actual_position}')
# First, correct reported_duration.
self._reported_duration = self._actual_duration
# Then, correct position.
# This uploads an activity with reported_duration and position.
self.update_position(actual_position)
return self._actual_duration
def set_duration(self, duration: int) -> None:
self._reported_duration = duration
def get_activity(self) -> PlaybackActivity:
for activity in self._activities:
if self.podcast.user.user_id == activity.user_id:
return activity
activity = PlaybackActivity(self.podcast.user.user_id, timestamp=datetime_util.from_timestamp(0))
self._activities.append(activity)
return activity
def set_activity(self, new_activity: PlaybackActivity):
prev_activity = self.get_activity()
if new_activity.timestamp <= prev_activity.timestamp:
raise ValueError(f'Expected timestamp of new activity to be greater.'
f' Previous: {prev_activity.timestamp}.'
f' New: {new_activity.timestamp}')
self._activities.remove(prev_activity)
self._activities.append(new_activity)
def get_position(self) -> int:
return self.get_activity().position
def update_position(self, position: int, timestamp: datetime = None):
if timestamp is None:
timestamp = datetime_util.now()
self.set_activity(PlaybackActivity(
self.podcast.user.user_id,
position,
timestamp
))
# Write changes.
self.write()
# Upload changes.
try:
NextcloudApi.push_episode_actions(
server=self.podcast.user.server,
login_name=self.podcast.user.login_name,
app_password=self.podcast.user.app_password,
actions=[self._get_episode_action()])
except requests.exceptions.ConnectionError as _e:
Log.error(f'Could not upload new actions.')
def write(self) -> None:
# Write changes to JSON file.
self.podcast.write()
def _get_episode_action(self) -> EpisodeAction:
action = self.get_activity()
return EpisodeAction(
podcast=self.podcast.url,
episode=self.url,
guid=self.guid,
action=EpisodeActionType.PLAY,
timestamp=action.timestamp,
started=0,
position=action.position,
total=self.get_duration()
)

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any
from nextcast import json_util
from nextcast.cli_gui import list_selection, CancelButton, TopButton
from nextcast.nextcloud import NextcloudApi
from nextcast.podcast import User
class UserManager:
_content = None
@classmethod
def _get_content(cls) -> dict[str, Any]:
if cls._content is None:
cls._content = json_util.read_credentials()
return cls._content
@classmethod
def _set_content(cls, content: dict[str, Any]):
cls._content = content
@classmethod
def user_ids(cls) -> list[str]:
return list(cls._get_content().keys())
@classmethod
def from_disk(cls, user_id: str) -> User:
app_password = cls._content[user_id]
return User.from_id(user_id, app_password)
@classmethod
def add_to_disk(cls, user: User) -> None:
cls._get_content()[user.user_id] = user.app_password
json_util.write_credentials(cls._get_content())
@classmethod
def from_interaction(cls) -> User:
"""
Interactively select or add a Nextcloud account.
:return: Logged in Nextcloud user.
"""
users_ids = cls.user_ids()
if len(users_ids) > 0:
# Select one of the logged-in users
# or select `Add account`.
exit_str = '← EXIT'
add_account_str = 'Add account'
try:
selected = list_selection(
elements=users_ids,
display_function=lambda x: x,
title='Choose your account',
top_buttons=[exit_str, add_account_str]
)
return cls.from_disk(user_id=selected)
except CancelButton as _e:
exit()
except TopButton as e:
if e.value == exit_str:
exit()
elif e.value == add_account_str:
pass # `Add account` was selected.
else:
raise NotImplementedError()
# Login new user.
try:
domain = input("Enter Nextcloud server URL: https://")
except KeyboardInterrupt as _e:
exit()
# Remove tailing '/'.
if domain.endswith('/'):
domain = domain[0:-1]
new_user = cls.login_and_save(domain)
return new_user
@classmethod
def login_and_save(cls, domain: str) -> User:
server, login_name, app_password = NextcloudApi.login(domain=domain)
user = User(server=server, login_name=login_name, app_password=app_password)
cls.add_to_disk(user)
return user