mirror of
https://codeberg.org/privacy1st/de-p1st-monitor
synced 2025-01-22 02:32:43 +01:00
feat: don't warn if value decreased
This commit is contained in:
parent
c36b3e1961
commit
5146ae275e
@ -1,10 +1,11 @@
|
||||
from pathlib import Path
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from de.p1st.monitor import csv_util
|
||||
from de.p1st.monitor import csv_util, datetime_util
|
||||
from de.p1st.monitor.cfg.logging_dir import logging_dir
|
||||
from de.p1st.monitor.string_conversion import to_string, from_string
|
||||
from de.p1st.monitor.warn import WarnMessage, WarnLevel
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
# https://www.geeksforgeeks.org/abstract-classes-in-python/
|
||||
@ -15,7 +16,7 @@ class Logger(ABC):
|
||||
warn_if_above: int | float = None,
|
||||
critical_if_above: int | float = None,
|
||||
):
|
||||
self.data: list[any] | None = None
|
||||
self.data: list[any] = []
|
||||
# True if the data held by this object is already appended to the logfile.
|
||||
self.logged = False
|
||||
|
||||
@ -29,7 +30,8 @@ class Logger(ABC):
|
||||
f' {critical_if_above}, {warn_if_above}')
|
||||
else:
|
||||
if warn_if_above is not None or critical_if_above is not None:
|
||||
raise Exception('Either both, warn_if_above and critical_if_above, must be given, or both must be None.')
|
||||
raise Exception(
|
||||
'Either both, warn_if_above and critical_if_above, must be given, or both must be None.')
|
||||
self.warn_if_above = warn_if_above
|
||||
self.critical_if_above = critical_if_above
|
||||
|
||||
@ -49,41 +51,113 @@ class Logger(ABC):
|
||||
|
||||
def check(self) -> WarnMessage:
|
||||
"""
|
||||
Checks the latest `self.warn_data_range` datasets for problems using `self.check_data()`.
|
||||
Checks if `self.warn_threshold` or more of the latest `self.warn_data_range` datasets
|
||||
contain values higher than `self.warn_if_above` in which case a warning (NORMAL or HIGH)
|
||||
might be returned.
|
||||
|
||||
If at least `self.warn_threshold` problems are found,
|
||||
then a WarnMessage with the highest reported WarnLevel is returned.
|
||||
In details:
|
||||
|
||||
If at least one WarnLevel is above NORMAL,
|
||||
then a WarnMessage is returned independent of the number of problems.
|
||||
If the current value is above `self.critical_if_above`,
|
||||
then a HIGH WarnMessage is returned.
|
||||
|
||||
If less than `self.warn_threshold` values are above `self.warn_if_above`,
|
||||
then no warning is returned.
|
||||
|
||||
If the current value is lower than the previously logged one
|
||||
and if during the previous check a warning was issued,
|
||||
then no warning is returned.
|
||||
|
||||
Otherwise, a NORMAL WarnMessage is returned.
|
||||
"""
|
||||
datasets = self.get_datasets(self.warn_data_range)
|
||||
warnings = [self.check_data(data) for data in datasets]
|
||||
warnings = [warning for warning in warnings
|
||||
if not warning.level == WarnLevel.NONE]
|
||||
datasets = self.get_datasets(self.warn_data_range + 1)
|
||||
warn_datas = [self.get_warn_data(data) for data in datasets]
|
||||
current_warn_data = warn_datas[-1]
|
||||
|
||||
if len(warnings) == 0:
|
||||
# If current value is critical (or current warning is HIGH), directly return HIGH WarnMessage.
|
||||
#
|
||||
# -> As we don't want to send the same warning twice,
|
||||
# we check only if the current value is critical.
|
||||
if isinstance(current_warn_data, WarnData):
|
||||
if current_warn_data.value > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, datetime_util.now(), current_warn_data.message)
|
||||
elif isinstance(current_warn_data, WarnMessage):
|
||||
if current_warn_data.level > WarnLevel.NORMAL:
|
||||
return current_warn_data
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
current_warn_datas = warn_datas[-self.warn_data_range:]
|
||||
curr_num_warnings, curr_highest_warning = self._get_num_warnings(current_warn_datas)
|
||||
|
||||
# Warning threshold not reached.
|
||||
if curr_num_warnings < self.warn_threshold:
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
# max() must not be called with an empty list.
|
||||
highest_warn_level = max([warning.level for warning in warnings])
|
||||
|
||||
messages: list[str] = [warning.message for warning in warnings]
|
||||
message = f'{len(warnings)} of the last {self.warn_data_range} datasets are above limits:\n\t' \
|
||||
+ '\n\t'.join(messages)
|
||||
previous_warn_datas = warn_datas[-self.warn_data_range - 1:-1]
|
||||
prev_num_warnings, prev_highest_warning = self._get_num_warnings(previous_warn_datas)
|
||||
|
||||
if highest_warn_level > WarnLevel.NORMAL:
|
||||
return WarnMessage(highest_warn_level, message)
|
||||
if len(warnings) >= self.warn_threshold:
|
||||
return WarnMessage(highest_warn_level, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
# Don't send warning again if the current value decreased.
|
||||
if prev_num_warnings >= self.warn_if_above:
|
||||
previous_warn_data = warn_datas[-2]
|
||||
if isinstance(current_warn_data, WarnData) \
|
||||
and isinstance(previous_warn_data, WarnData) \
|
||||
and current_warn_data.value <= previous_warn_data.value:
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
# Send warning.
|
||||
warn_messages = self._get_warn_messages(current_warn_datas)
|
||||
message = f'{curr_num_warnings} of the last {len(current_warn_datas)} datasets are above limits:\n\t' \
|
||||
+ '\n\t'.join(warn_messages)
|
||||
return WarnMessage(WarnLevel.NORMAL, datetime_util.now(), message)
|
||||
|
||||
def _get_num_warnings(self, warn_datas: list[WarnData | WarnMessage]) -> tuple[int, WarnLevel]:
|
||||
"""
|
||||
@return: Number of warnings and the highest WarnLevel
|
||||
"""
|
||||
num_warnings = 0
|
||||
highest_warn_level = WarnLevel.NONE
|
||||
|
||||
for warn_data in warn_datas:
|
||||
if isinstance(warn_data, WarnMessage):
|
||||
highest_warn_level = max(highest_warn_level, warn_data.level)
|
||||
elif isinstance(warn_data, WarnData):
|
||||
if warn_data.value > self.critical_if_above:
|
||||
num_warnings += 1
|
||||
highest_warn_level = max(highest_warn_level, WarnLevel.HIGH)
|
||||
elif warn_data.value > self.warn_if_above:
|
||||
num_warnings += 1
|
||||
highest_warn_level = max(highest_warn_level, WarnLevel.NORMAL)
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
return num_warnings, highest_warn_level
|
||||
|
||||
def _get_warn_messages(self, warn_datas: list[WarnData | WarnMessage]) -> list[str]:
|
||||
messages: list[str] = []
|
||||
|
||||
for warn_data in warn_datas:
|
||||
if isinstance(warn_data, WarnMessage):
|
||||
messages.append(warn_data.formatted_message())
|
||||
elif isinstance(warn_data, WarnData):
|
||||
if warn_data.value > self.critical_if_above:
|
||||
messages.append(
|
||||
WarnMessage(WarnLevel.HIGH, warn_data.date, warn_data.message).formatted_message())
|
||||
elif warn_data.value > self.warn_if_above:
|
||||
messages.append(
|
||||
WarnMessage(WarnLevel.NORMAL, warn_data.date, warn_data.message).formatted_message())
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
return messages
|
||||
|
||||
@abstractmethod
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage:
|
||||
"""
|
||||
Check the given data for problems.
|
||||
Return a WarnLevel indicating how serious the problems are.
|
||||
Calculate warn value from given data and return as part of `WarnData` object.
|
||||
|
||||
If there are no problems, return `WarnLevel.NONE`.
|
||||
If the value can't be calculated, directly return a `WarnMessage` object.
|
||||
|
||||
@return Warn value; Message for normal warning; Message for critical warning
|
||||
"""
|
||||
raise ValueError('Subclasses must implement this')
|
||||
|
||||
@ -105,6 +179,8 @@ class Logger(ABC):
|
||||
def get_datasets(self, num: int) -> list[list[any]]:
|
||||
"""
|
||||
Returns the last `num` datasets (including the current dataset).
|
||||
|
||||
The first row is the oldest, and the latest row is the current dataset.
|
||||
"""
|
||||
if not self.logged and self.has_data():
|
||||
# We will append the current data manually.
|
||||
@ -130,18 +206,15 @@ class Logger(ABC):
|
||||
|
||||
return data
|
||||
|
||||
def log(self, skip_if_no_data: bool = False) -> None:
|
||||
def log(self) -> None:
|
||||
"""
|
||||
Appends the current data (e.g. temperature of a sensor)
|
||||
to a logfile.
|
||||
|
||||
:param skip_if_no_data: Can be used to do nothing if no data is available. If one is sure to have called update() previously, this can be set to True.
|
||||
:raise Exception: If method is called but no data is available. Please do call update() first to avoid this!
|
||||
"""
|
||||
if self.logged:
|
||||
return
|
||||
if skip_if_no_data and not self.has_data():
|
||||
return
|
||||
|
||||
csv_util.write(file=self.get_log_file(), rows=[self.get_data_as_row()], header=self.data_schema())
|
||||
self.logged = True
|
||||
@ -151,12 +224,10 @@ class Logger(ABC):
|
||||
self.logged = False
|
||||
|
||||
@abstractmethod
|
||||
def read_data(self) -> list[any] | None:
|
||||
def read_data(self) -> list[any]:
|
||||
"""
|
||||
Collects current data (e.g. temperature of a sensor).
|
||||
|
||||
Might return None if sensor is detached / not available.
|
||||
|
||||
:raise LoggerReadEx:
|
||||
"""
|
||||
raise ValueError('Subclasses must implement this')
|
||||
@ -199,7 +270,7 @@ class Logger(ABC):
|
||||
]
|
||||
|
||||
def has_data(self) -> bool:
|
||||
return self.data is not None
|
||||
return len(self.data) > 0
|
||||
|
||||
def get_data(self) -> list[any]:
|
||||
"""
|
||||
@ -210,7 +281,9 @@ class Logger(ABC):
|
||||
else:
|
||||
raise ValueError(f'Data has not yet been read. {self.__str__()}')
|
||||
|
||||
def set_data(self, data: list[any] | None):
|
||||
def set_data(self, data: list[any]):
|
||||
if len(data) < 1:
|
||||
raise ValueError()
|
||||
self.data = data
|
||||
|
||||
def data_type_strs(self, data_schema: list[str] = None) -> list[str]:
|
||||
|
@ -7,7 +7,7 @@ import psutil
|
||||
from de.p1st.monitor import datetime_util
|
||||
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.warn import WarnMessage, WarnLevel
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class CPULogger(Logger):
|
||||
@ -24,17 +24,12 @@ class CPULogger(Logger):
|
||||
critical_if_above)
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData:
|
||||
load_avg = data[1]
|
||||
message = f'CPU load avg of last {self.get_load_timespan()} minutes is at {load_avg}'
|
||||
return WarnData(data[0], load_avg, message)
|
||||
|
||||
if load_avg > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if load_avg > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
def read_data(self) -> list[any] | None:
|
||||
def read_data(self) -> list[any]:
|
||||
return [
|
||||
datetime_util.now(),
|
||||
self.get_load(self.get_load_timespan())
|
||||
@ -50,14 +45,13 @@ class CPULogger(Logger):
|
||||
return self.get_log_dir() / f'cpu_{self.get_load_timespan()}min.csv'
|
||||
|
||||
@abstractmethod
|
||||
def get_load_timespan(self) -> Literal[1, 5, 15]:
|
||||
def get_load_timespan(self) -> Literal[1, 5, 15]:
|
||||
raise ValueError('Subclasses must implement this')
|
||||
|
||||
#
|
||||
# HELPERS
|
||||
#
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_load(minutes: Literal[1, 5, 15]) -> float:
|
||||
"""
|
||||
@ -79,12 +73,17 @@ class CPULogger(Logger):
|
||||
# This value has been tested to be correct on my AMD Ryzen 4800H CPU.
|
||||
return psutil.getloadavg()[idx] / psutil.cpu_count()
|
||||
|
||||
|
||||
class CPULogger1(CPULogger):
|
||||
def get_load_timespan(self) -> Literal[1, 5, 15]:
|
||||
return 1
|
||||
|
||||
|
||||
class CPULogger5(CPULogger):
|
||||
def get_load_timespan(self) -> Literal[1, 5, 15]:
|
||||
return 5
|
||||
|
||||
|
||||
class CPULogger15(CPULogger):
|
||||
def get_load_timespan(self) -> Literal[1, 5, 15]:
|
||||
return 15
|
||||
|
@ -6,11 +6,13 @@ from de.p1st.monitor import datetime_util
|
||||
from de.p1st.monitor.exec_capture import execute_capture
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.logger_ex import LoggerArgEx, LoggerReadEx
|
||||
from de.p1st.monitor.warn import WarnLevel, WarnMessage
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class BlkidException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DriveLogger(Logger):
|
||||
def __init__(self, uuid: str = None,
|
||||
device: Path = None,
|
||||
@ -44,17 +46,12 @@ class DriveLogger(Logger):
|
||||
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData:
|
||||
temp = data[1]
|
||||
message = f'Temperature of {self.uuid} ist at {temp}'
|
||||
return WarnData(data[0], temp, message)
|
||||
|
||||
if temp > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if temp > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
def read_data(self) -> list[any]:
|
||||
return [
|
||||
datetime_util.now(),
|
||||
self.get_temp_from_device(self.device),
|
||||
|
@ -1,12 +1,14 @@
|
||||
import math
|
||||
from pathlib import Path
|
||||
|
||||
import psutil
|
||||
from de.p1st.monitor import datetime_util
|
||||
from de.p1st.monitor import datetime_util, csv_util
|
||||
from de.p1st.monitor.exec_capture import execute_capture
|
||||
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.logger_ex import LoggerArgEx, LoggerReadEx
|
||||
from de.p1st.monitor.warn import WarnLevel, WarnMessage
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class NotMounted(Exception):
|
||||
@ -81,22 +83,37 @@ class FilesystemLogger(Logger):
|
||||
else:
|
||||
raise LoggerReadEx(getattr(e, 'message', e))
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
if not self.mounted:
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
def export_data(self) -> Path:
|
||||
data = self.get_all_datasets()
|
||||
|
||||
export_schema = self.data_schema()
|
||||
|
||||
# Filter rows where `Disk usage` is NAN.
|
||||
export_data = [row for row in data
|
||||
if not math.isnan(row[1])]
|
||||
|
||||
export_file = self.get_log_file().parent.joinpath(self.get_log_file().name + '.exported.csv')
|
||||
rows = [self.as_row(export_row, export_schema) for export_row in export_data]
|
||||
csv_util.write(file=export_file, rows=rows, header=export_schema, recreate_file=True)
|
||||
return export_file
|
||||
|
||||
def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage:
|
||||
disk_usage = data[1]
|
||||
if math.isnan(disk_usage):
|
||||
if self.unmounted_ok:
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
else:
|
||||
return WarnMessage(WarnLevel.HIGH, data[0], 'Disk is not mounted')
|
||||
|
||||
message = f'Disk usage of {self.uuid} ist at {disk_usage}'
|
||||
return WarnData(data[0], disk_usage, message)
|
||||
|
||||
if disk_usage > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if disk_usage > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
def read_data(self) -> list[any] | None:
|
||||
def read_data(self) -> list[any]:
|
||||
if not self.mounted:
|
||||
return None
|
||||
return [
|
||||
datetime_util.now(),
|
||||
float('nan')
|
||||
]
|
||||
|
||||
disk_usage: float = self.get_disk_usage(self.mountpoint)
|
||||
return [
|
||||
@ -105,6 +122,10 @@ class FilesystemLogger(Logger):
|
||||
]
|
||||
|
||||
def data_schema(self) -> list[str]:
|
||||
"""
|
||||
If no disk usage value could be read (if a disk was unmounted),
|
||||
NAN is stored as `Disk usage`.
|
||||
"""
|
||||
return ['datetime#Date', 'float#Disk usage']
|
||||
|
||||
def get_log_file(self) -> Path:
|
||||
|
@ -4,7 +4,7 @@ import psutil
|
||||
from de.p1st.monitor import datetime_util
|
||||
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.warn import WarnMessage, WarnLevel
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class MemoryLogger(Logger):
|
||||
@ -13,7 +13,6 @@ class MemoryLogger(Logger):
|
||||
warn_threshold: int = 1,
|
||||
warn_data_range: int = 1,
|
||||
):
|
||||
|
||||
# The space between memory is at `self.warn_if_above` and memory is full at `1.0`.
|
||||
buffer = 1 - warn_if_above
|
||||
critical_if_above = warn_if_above + 0.5 * buffer
|
||||
@ -23,20 +22,15 @@ class MemoryLogger(Logger):
|
||||
critical_if_above)
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData:
|
||||
used_mb = data[1]
|
||||
total_available_mb = data[3]
|
||||
message = f'Memory usage ist at {used_mb} MB of {total_available_mb} MB'
|
||||
|
||||
used = used_mb / total_available_mb
|
||||
return WarnData(data[0], used, message)
|
||||
|
||||
if used > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if used > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
def read_data(self) -> list[any]:
|
||||
used_mb, free_mb, available_mb, total_mb = self.get_memory()
|
||||
used_and_cached_mb = total_mb - free_mb
|
||||
total_available_mb = used_mb + available_mb
|
||||
@ -48,7 +42,8 @@ class MemoryLogger(Logger):
|
||||
]
|
||||
|
||||
def data_schema(self) -> list[str]:
|
||||
return ['datetime#Date', 'int#Used memory in MB', 'int#Used and cached in MB', 'int#Total available memory in MB']
|
||||
return ['datetime#Date', 'int#Used memory in MB', 'int#Used and cached in MB',
|
||||
'int#Total available memory in MB']
|
||||
|
||||
def get_log_file(self) -> Path:
|
||||
return self.get_log_dir() / f'memory.csv'
|
||||
|
@ -54,7 +54,7 @@ class NetworkLogger(Logger):
|
||||
csv_util.write(file=export_file, rows=rows, header=export_schema, recreate_file=True)
|
||||
return export_file
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnMessage:
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
|
||||
def data_schema(self) -> list[str]:
|
||||
@ -65,7 +65,7 @@ class NetworkLogger(Logger):
|
||||
'datetime#Boot date',
|
||||
]
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
def read_data(self) -> list[any]:
|
||||
sent, received = self.get_net_usage()
|
||||
return [
|
||||
datetime_util.now(),
|
||||
@ -101,6 +101,7 @@ class NetworkLogger(Logger):
|
||||
epoch_seconds = psutil.boot_time()
|
||||
return datetime.fromtimestamp(epoch_seconds, tz=timezone.utc)
|
||||
|
||||
|
||||
def test():
|
||||
from de.p1st.monitor.cfg.singleton import init_cfg
|
||||
init_cfg()
|
||||
@ -109,5 +110,6 @@ def test():
|
||||
logger.update()
|
||||
logger.log()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
||||
|
@ -5,6 +5,7 @@ from de.p1st.monitor import datetime_util
|
||||
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.warn import WarnMessage, WarnLevel
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class SwapLogger(Logger):
|
||||
@ -13,7 +14,6 @@ class SwapLogger(Logger):
|
||||
warn_threshold: int = 1,
|
||||
warn_data_range: int = 1,
|
||||
):
|
||||
|
||||
# The space between swap is at `self.warn_if_above` and swap is full at `1.0`.
|
||||
buffer = 1 - warn_if_above
|
||||
critical_if_above = warn_if_above + 0.5 * buffer
|
||||
@ -23,21 +23,16 @@ class SwapLogger(Logger):
|
||||
critical_if_above)
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage:
|
||||
used_mb = data[1]
|
||||
total_mb = data[2]
|
||||
message = f'Swap usage ist at {used_mb} MB of {total_mb} MB'
|
||||
|
||||
if used_mb == 0 and total_mb == 0:
|
||||
return WarnMessage(WarnLevel.HIGH, 'Swap is not enabled')
|
||||
return WarnMessage(WarnLevel.HIGH, data[0], 'Swap is not enabled')
|
||||
|
||||
usage = used_mb / total_mb
|
||||
|
||||
if usage > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if usage > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
return WarnData(data[0], usage, message)
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
used_mb, total_mb = self.get_swap()
|
||||
|
@ -5,7 +5,7 @@ import psutil
|
||||
from de.p1st.monitor import datetime_util
|
||||
from de.p1st.monitor.logger import Logger
|
||||
from de.p1st.monitor.logger_ex import LoggerReadEx
|
||||
from de.p1st.monitor.warn import WarnMessage, WarnLevel
|
||||
from de.p1st.monitor.warn_data import WarnData
|
||||
|
||||
|
||||
class TempLogger(Logger):
|
||||
@ -26,15 +26,10 @@ class TempLogger(Logger):
|
||||
|
||||
self.warn_if_above = warn_if_above
|
||||
|
||||
def check_data(self, data: list[any]) -> WarnMessage:
|
||||
def get_warn_data(self, data: list[any]) -> WarnData:
|
||||
temp = data[1]
|
||||
message = f'Temperature of {self.name} {self.label} ist at {temp}'
|
||||
|
||||
if temp > self.critical_if_above:
|
||||
return WarnMessage(WarnLevel.HIGH, message)
|
||||
if temp > self.warn_if_above:
|
||||
return WarnMessage(WarnLevel.NORMAL, message)
|
||||
return WarnMessage(WarnLevel.NONE)
|
||||
return WarnData(data[0], temp, message)
|
||||
|
||||
def read_data(self) -> list[any]:
|
||||
return [
|
||||
@ -60,7 +55,7 @@ class TempLogger(Logger):
|
||||
:return: Temperature in celsius
|
||||
"""
|
||||
data = psutil.sensors_temperatures(fahrenheit=False)
|
||||
if not self.name in data:
|
||||
if self.name not in data:
|
||||
raise LoggerReadEx(f'Sensor {self.name} not found')
|
||||
for i in data[self.name]:
|
||||
if i.label == self.label:
|
||||
|
@ -54,9 +54,7 @@ def log():
|
||||
except LoggerReadEx as e:
|
||||
logger_read_exs.append(e)
|
||||
continue
|
||||
# After logger.update() there might still be no data
|
||||
# Example: FilesystemLogger if partition is not mounted (and unmounted_ok is True)
|
||||
logger.log(skip_if_no_data=True)
|
||||
logger.log()
|
||||
if logger.check().print().is_warning():
|
||||
logger_warnings += 1
|
||||
|
||||
|
@ -17,8 +17,10 @@ def data_types() -> dict[str, dict[str, Callable[[any], any]]]:
|
||||
'datetime': {'to': datetime_util.to_str, 'from': datetime_util.from_str},
|
||||
}
|
||||
|
||||
|
||||
def to_string(v: any, type_str: str) -> str:
|
||||
return data_types()[type_str]['to'](v)
|
||||
|
||||
|
||||
def from_string(v: str, type_str: str) -> any:
|
||||
return data_types()[type_str]['from'](v)
|
||||
|
@ -1,7 +1,9 @@
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from functools import total_ordering
|
||||
from typing import TextIO
|
||||
|
||||
|
||||
# https://docs.python.org/3/library/functools.html#functools.total_ordering
|
||||
@ -24,26 +26,54 @@ class WarnLevel(Enum):
|
||||
|
||||
|
||||
class WarnMessage:
|
||||
def __init__(self, level: WarnLevel, message: str = None):
|
||||
def __init__(self, level: WarnLevel, date: datetime = None, message: str = None):
|
||||
"""
|
||||
@param level:
|
||||
@param date: Required, except if `WarnLevel.NONE` given.
|
||||
@param message: Required, except if `WarnLevel.NONE` given.
|
||||
"""
|
||||
self.level = level
|
||||
self.date = date
|
||||
self.message = message
|
||||
|
||||
if self.level != WarnLevel.NONE:
|
||||
if self.date is None:
|
||||
raise ValueError()
|
||||
if self.message is None:
|
||||
raise ValueError()
|
||||
if self.level == WarnLevel.NONE:
|
||||
if self.date is not None:
|
||||
raise ValueError()
|
||||
if self.message is not None:
|
||||
raise ValueError()
|
||||
|
||||
def is_warning(self) -> bool:
|
||||
return self.level > WarnLevel.NONE
|
||||
|
||||
def print(self, default_message: str = 'Warning!') -> WarnMessage:
|
||||
def print(self) -> WarnMessage:
|
||||
"""
|
||||
return: self
|
||||
"""
|
||||
message = default_message if self.message is None else self.message
|
||||
|
||||
if self.level == WarnLevel.NONE:
|
||||
pass
|
||||
elif self.level == WarnLevel.LOW:
|
||||
print(message)
|
||||
elif self.level == WarnLevel.NORMAL:
|
||||
print(message, file=sys.stderr)
|
||||
elif self.level == WarnLevel.HIGH:
|
||||
print(f'[CRITICAL] {message}', file=sys.stderr)
|
||||
else:
|
||||
print(self.formatted_message(), file=self.file())
|
||||
|
||||
return self
|
||||
|
||||
def formatted_message(self) -> str:
|
||||
return self.prefix() + self.message
|
||||
|
||||
def prefix(self) -> str:
|
||||
if self.level == WarnLevel.NONE:
|
||||
raise ValueError()
|
||||
if self.level > WarnLevel.HIGH:
|
||||
return f'[CRITICAL] {self.date}: '
|
||||
return f'{self.date}:'
|
||||
|
||||
def file(self) -> TextIO:
|
||||
if self.level == WarnLevel.NONE:
|
||||
raise ValueError()
|
||||
if self.level >= WarnLevel.NORMAL:
|
||||
return sys.stderr
|
||||
return sys.stdout
|
||||
|
8
src/de/p1st/monitor/warn_data.py
Normal file
8
src/de/p1st/monitor/warn_data.py
Normal file
@ -0,0 +1,8 @@
|
||||
from typing import NamedTuple
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class WarnData(NamedTuple):
|
||||
date: datetime
|
||||
value: int | float
|
||||
message: str
|
Loading…
x
Reference in New Issue
Block a user