mirror of
https://codeberg.org/privacy1st/de-p1st-monitor
synced 2024-11-23 19:49:32 +01:00
feat: sensor_script logger
This commit is contained in:
parent
ce027d2a85
commit
36d6011d9f
16
README.md
16
README.md
@ -32,17 +32,7 @@ See [cfg/yodaTux.ini](cfg/yodaTux.ini) for a configuration file covering all con
|
||||
|
||||
## Installation
|
||||
|
||||
Install dependencies:
|
||||
|
||||
- on Arch Linux
|
||||
|
||||
```shell
|
||||
# TODO
|
||||
# Optional: 1-wire temperature sensor.
|
||||
sudo pacman -S digitemp # TODO: configure your sensor
|
||||
```
|
||||
|
||||
- on Ubuntu
|
||||
Install dependencies on Ubuntu
|
||||
|
||||
```shell
|
||||
sudo apt-get install python3-pip
|
||||
@ -62,13 +52,13 @@ sudo python3 -m pip install psutil --upgrade
|
||||
|
||||
Install:
|
||||
|
||||
- on Arch Linux
|
||||
- On Arch Linux
|
||||
|
||||
```shell
|
||||
make
|
||||
```
|
||||
|
||||
- on Ubuntu
|
||||
- On Ubuntu
|
||||
|
||||
```shell
|
||||
make install-pip
|
||||
|
8
TODO.md
8
TODO.md
@ -1,10 +1,14 @@
|
||||
# TODOs
|
||||
|
||||
## Public IP address
|
||||
## ~~digitemp temperature logging~~
|
||||
|
||||
~~Done through generic sensor_script logger.~~
|
||||
|
||||
## Public IP address logging
|
||||
|
||||
Logg the public IP address. Reuse `netcup-dns` Python functions.
|
||||
|
||||
## Rewrite
|
||||
## ~~Rewrite~~
|
||||
|
||||
* ~~easier configuration~~
|
||||
* ~~easier read/write from/to csv~~
|
||||
|
@ -72,8 +72,7 @@ warn_if_above = 40
|
||||
uuid = 4651c3f1-e4b8-45aa-a823-df762530a307
|
||||
warn_if_above = 40
|
||||
|
||||
; TODO digitemp sensor
|
||||
;[digitemp_DS9097.1]
|
||||
;cfg = /root/.digitemprc
|
||||
;sensor_num = 0
|
||||
;name = room-temp
|
||||
[sensor_script.1]
|
||||
cmd = ["digitemp_DS9097", "-q", "-t", "0"]
|
||||
name = room-temp
|
||||
warn_if_above = 32
|
||||
|
@ -77,3 +77,11 @@ device = /dev/nvme0n1p3
|
||||
; Warn if temperature is above this value.
|
||||
; Unit: °C
|
||||
warn_if_above = 25
|
||||
|
||||
[sensor_script.1]
|
||||
; The command will be executed.
|
||||
; It has to return a float (or int) and exit code 0 on success.
|
||||
; This value is then logged.
|
||||
cmd = ["digitemp_DS9097", "-q", "-t", "0"]
|
||||
name = room-temp
|
||||
warn_if_above = 32
|
||||
|
@ -1,4 +1,5 @@
|
||||
import configparser
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from de.p1st.monitor.cfg.singleton import get_cfg
|
||||
@ -8,6 +9,7 @@ from de.p1st.monitor.loggers.drive import DriveLogger
|
||||
from de.p1st.monitor.loggers.filesystem import FilesystemLogger
|
||||
from de.p1st.monitor.loggers.memory import MemoryLogger
|
||||
from de.p1st.monitor.loggers.network import NetworkLogger
|
||||
from de.p1st.monitor.loggers.script import ScriptLogger
|
||||
from de.p1st.monitor.loggers.swap import SwapLogger
|
||||
from de.p1st.monitor.loggers.temp import TempLogger
|
||||
from de.p1st.monitor.logger import Logger
|
||||
@ -29,6 +31,19 @@ def get_loggers() -> tuple[list[Logger], list[LoggerArgEx]]:
|
||||
warn_data_range = int(cfg_.get('warn_data_range', '1'))
|
||||
return TempLogger(sensor, label, warn_if_above, warn_threshold, warn_data_range)
|
||||
|
||||
def sensor_script(cfg_: configparser.SectionProxy) -> Logger:
|
||||
cmd_json_str = get_or_raise(cfg_, 'cmd')
|
||||
cmd_json: list[str] = json.loads(cmd_json_str)
|
||||
assert isinstance(cmd_json, list)
|
||||
for arg in cmd_json:
|
||||
assert isinstance(arg, str)
|
||||
|
||||
name = get_or_raise(cfg_, 'name')
|
||||
warn_if_above = float(cfg_['warn_if_above']) if 'warn_if_above' in cfg_ else None
|
||||
warn_threshold = int(cfg_.get('warn_threshold', '1'))
|
||||
warn_data_range = int(cfg_.get('warn_data_range', '1'))
|
||||
return ScriptLogger(cmd_json, name, warn_if_above, warn_threshold, warn_data_range)
|
||||
|
||||
def cpu1(cfg_: configparser.SectionProxy) -> Logger:
|
||||
warn_if_above = float(cfg_['warn_if_above']) if 'warn_if_above' in cfg_ else None
|
||||
warn_threshold = int(cfg_.get('warn_threshold', '1'))
|
||||
@ -82,6 +97,7 @@ def get_loggers() -> tuple[list[Logger], list[LoggerArgEx]]:
|
||||
|
||||
mapping = {
|
||||
'temp': temp,
|
||||
'sensor_script': sensor_script,
|
||||
'cpu1': cpu1,
|
||||
'cpu5': cpu5,
|
||||
'cpu15': cpu15,
|
||||
|
@ -69,6 +69,10 @@ class Logger(ABC):
|
||||
|
||||
Otherwise, a NORMAL WarnMessage is returned.
|
||||
"""
|
||||
if self.warn_if_above is None:
|
||||
# self.critical_if_above is also None
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
datasets = self.get_datasets(self.warn_data_range + 1)
|
||||
warn_datas = [self.get_warn_data(data) for data in datasets]
|
||||
current_warn_data = warn_datas[-1]
|
||||
@ -112,6 +116,7 @@ class Logger(ABC):
|
||||
|
||||
def _get_num_warnings(self, warn_datas: list[WarnData | WarnMessage]) -> tuple[int, WarnLevel]:
|
||||
"""
|
||||
@precondition: self.warn_if_above and self.critical_if_above are not None
|
||||
@return: Number of warnings and the highest WarnLevel
|
||||
"""
|
||||
num_warnings = 0
|
||||
@ -133,6 +138,9 @@ class Logger(ABC):
|
||||
return num_warnings, highest_warn_level
|
||||
|
||||
def _get_warn_messages(self, warn_datas: list[WarnData | WarnMessage]) -> list[str]:
|
||||
"""
|
||||
@precondition: self.warn_if_above and self.critical_if_above are not None
|
||||
"""
|
||||
messages: list[str] = []
|
||||
|
||||
for warn_data in warn_datas:
|
||||
|
83
src/de/p1st/monitor/loggers/script.py
Normal file
83
src/de/p1st/monitor/loggers/script.py
Normal file
@ -0,0 +1,83 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from de.p1st.monitor import datetime_util
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.logger_ex import LoggerReadEx
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class ScriptLogger(Logger):
|
||||
def __init__(self, command: list[str],
|
||||
sensor_name: str,
|
||||
warn_if_above: float = None,
|
||||
warn_threshold: int = 1,
|
||||
warn_data_range: int = 1,
|
||||
):
|
||||
if warn_if_above is None:
|
||||
critical_if_above = None
|
||||
else:
|
||||
critical_if_above = warn_if_above + 10
|
||||
|
||||
super().__init__(warn_threshold,
|
||||
warn_data_range,
|
||||
warn_if_above,
|
||||
critical_if_above)
|
||||
|
||||
self.name = sensor_name
|
||||
self.command = command
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def get_warn_data(self, data: list[any]) -> WarnData:
|
||||
value = data[1]
|
||||
message = f'Value of {self.name} ist at {value}'
|
||||
return WarnData(data[0], value, message)
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
return [
|
||||
datetime_util.now(),
|
||||
self.get_value()
|
||||
]
|
||||
|
||||
def data_schema(self) -> list[str]:
|
||||
return [
|
||||
'datetime#Date',
|
||||
'float#Value'
|
||||
]
|
||||
|
||||
def get_log_file(self) -> Path:
|
||||
return self.get_log_dir() / f'sensor_script_{self.name}.csv'
|
||||
|
||||
#
|
||||
# HELPERS
|
||||
#
|
||||
|
||||
def get_value(self) -> float:
|
||||
"""
|
||||
:return: Value of sensor
|
||||
"""
|
||||
completed: subprocess.CompletedProcess = subprocess.run(
|
||||
self.command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if completed.returncode != 0:
|
||||
raise LoggerReadEx(f'Script to read value of {self.name} failed with exit code {completed.returncode}.\n'
|
||||
f'stderr: {completed.stderr}\n'
|
||||
f'stdout: {completed.stdout}')
|
||||
value: str = completed.stdout.strip()
|
||||
return float(value)
|
||||
|
||||
|
||||
def test():
|
||||
from de.p1st.monitor.cfg import singleton
|
||||
singleton.init_cfg()
|
||||
|
||||
logger = ScriptLogger(["echo", "1.0"], 'test-script')
|
||||
logger.update()
|
||||
logger.log()
|
||||
logger.check().print()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
Loading…
Reference in New Issue
Block a user