From 5146ae275ee4faa3ed2acdee33cd04a9c6035d05 Mon Sep 17 00:00:00 2001 From: Daniel Langbein Date: Wed, 21 Jun 2023 19:16:25 +0200 Subject: [PATCH] feat: don't warn if value decreased --- src/de/p1st/monitor/logger.py | 145 ++++++++++++++++------ src/de/p1st/monitor/loggers/cpu.py | 21 ++-- src/de/p1st/monitor/loggers/drive.py | 15 +-- src/de/p1st/monitor/loggers/filesystem.py | 45 +++++-- src/de/p1st/monitor/loggers/memory.py | 17 +-- src/de/p1st/monitor/loggers/network.py | 6 +- src/de/p1st/monitor/loggers/swap.py | 13 +- src/de/p1st/monitor/loggers/temp.py | 13 +- src/de/p1st/monitor/main.py | 4 +- src/de/p1st/monitor/string_conversion.py | 2 + src/de/p1st/monitor/warn.py | 50 ++++++-- src/de/p1st/monitor/warn_data.py | 8 ++ 12 files changed, 227 insertions(+), 112 deletions(-) create mode 100644 src/de/p1st/monitor/warn_data.py diff --git a/src/de/p1st/monitor/logger.py b/src/de/p1st/monitor/logger.py index 9bedf5f..892ad0c 100644 --- a/src/de/p1st/monitor/logger.py +++ b/src/de/p1st/monitor/logger.py @@ -1,10 +1,11 @@ from pathlib import Path from abc import ABC, abstractmethod -from de.p1st.monitor import csv_util +from de.p1st.monitor import csv_util, datetime_util from de.p1st.monitor.cfg.logging_dir import logging_dir from de.p1st.monitor.string_conversion import to_string, from_string from de.p1st.monitor.warn import WarnMessage, WarnLevel +from de.p1st.monitor.warn_data import WarnData # https://www.geeksforgeeks.org/abstract-classes-in-python/ @@ -15,7 +16,7 @@ class Logger(ABC): warn_if_above: int | float = None, critical_if_above: int | float = None, ): - self.data: list[any] | None = None + self.data: list[any] = [] # True if the data held by this object is already appended to the logfile. self.logged = False @@ -29,7 +30,8 @@ class Logger(ABC): f' {critical_if_above}, {warn_if_above}') else: if warn_if_above is not None or critical_if_above is not None: - raise Exception('Either both, warn_if_above and critical_if_above, must be given, or both must be None.') + raise Exception( + 'Either both, warn_if_above and critical_if_above, must be given, or both must be None.') self.warn_if_above = warn_if_above self.critical_if_above = critical_if_above @@ -49,41 +51,113 @@ class Logger(ABC): def check(self) -> WarnMessage: """ - Checks the latest `self.warn_data_range` datasets for problems using `self.check_data()`. + Checks if `self.warn_threshold` or more of the latest `self.warn_data_range` datasets + contain values higher than `self.warn_if_above` in which case a warning (NORMAL or HIGH) + might be returned. - If at least `self.warn_threshold` problems are found, - then a WarnMessage with the highest reported WarnLevel is returned. + In details: - If at least one WarnLevel is above NORMAL, - then a WarnMessage is returned independent of the number of problems. + If the current value is above `self.critical_if_above`, + then a HIGH WarnMessage is returned. + + If less than `self.warn_threshold` values are above `self.warn_if_above`, + then no warning is returned. + + If the current value is lower than the previously logged one + and if during the previous check a warning was issued, + then no warning is returned. + + Otherwise, a NORMAL WarnMessage is returned. """ - datasets = self.get_datasets(self.warn_data_range) - warnings = [self.check_data(data) for data in datasets] - warnings = [warning for warning in warnings - if not warning.level == WarnLevel.NONE] + datasets = self.get_datasets(self.warn_data_range + 1) + warn_datas = [self.get_warn_data(data) for data in datasets] + current_warn_data = warn_datas[-1] - if len(warnings) == 0: + # If current value is critical (or current warning is HIGH), directly return HIGH WarnMessage. + # + # -> As we don't want to send the same warning twice, + # we check only if the current value is critical. + if isinstance(current_warn_data, WarnData): + if current_warn_data.value > self.critical_if_above: + return WarnMessage(WarnLevel.HIGH, datetime_util.now(), current_warn_data.message) + elif isinstance(current_warn_data, WarnMessage): + if current_warn_data.level > WarnLevel.NORMAL: + return current_warn_data + else: + raise ValueError() + + current_warn_datas = warn_datas[-self.warn_data_range:] + curr_num_warnings, curr_highest_warning = self._get_num_warnings(current_warn_datas) + + # Warning threshold not reached. + if curr_num_warnings < self.warn_threshold: return WarnMessage(WarnLevel.NONE) - # max() must not be called with an empty list. - highest_warn_level = max([warning.level for warning in warnings]) - messages: list[str] = [warning.message for warning in warnings] - message = f'{len(warnings)} of the last {self.warn_data_range} datasets are above limits:\n\t' \ - + '\n\t'.join(messages) + previous_warn_datas = warn_datas[-self.warn_data_range - 1:-1] + prev_num_warnings, prev_highest_warning = self._get_num_warnings(previous_warn_datas) - if highest_warn_level > WarnLevel.NORMAL: - return WarnMessage(highest_warn_level, message) - if len(warnings) >= self.warn_threshold: - return WarnMessage(highest_warn_level, message) - return WarnMessage(WarnLevel.NONE) + # Don't send warning again if the current value decreased. + if prev_num_warnings >= self.warn_if_above: + previous_warn_data = warn_datas[-2] + if isinstance(current_warn_data, WarnData) \ + and isinstance(previous_warn_data, WarnData) \ + and current_warn_data.value <= previous_warn_data.value: + return WarnMessage(WarnLevel.NONE) + + # Send warning. + warn_messages = self._get_warn_messages(current_warn_datas) + message = f'{curr_num_warnings} of the last {len(current_warn_datas)} datasets are above limits:\n\t' \ + + '\n\t'.join(warn_messages) + return WarnMessage(WarnLevel.NORMAL, datetime_util.now(), message) + + def _get_num_warnings(self, warn_datas: list[WarnData | WarnMessage]) -> tuple[int, WarnLevel]: + """ + @return: Number of warnings and the highest WarnLevel + """ + num_warnings = 0 + highest_warn_level = WarnLevel.NONE + + for warn_data in warn_datas: + if isinstance(warn_data, WarnMessage): + highest_warn_level = max(highest_warn_level, warn_data.level) + elif isinstance(warn_data, WarnData): + if warn_data.value > self.critical_if_above: + num_warnings += 1 + highest_warn_level = max(highest_warn_level, WarnLevel.HIGH) + elif warn_data.value > self.warn_if_above: + num_warnings += 1 + highest_warn_level = max(highest_warn_level, WarnLevel.NORMAL) + else: + raise ValueError() + + return num_warnings, highest_warn_level + + def _get_warn_messages(self, warn_datas: list[WarnData | WarnMessage]) -> list[str]: + messages: list[str] = [] + + for warn_data in warn_datas: + if isinstance(warn_data, WarnMessage): + messages.append(warn_data.formatted_message()) + elif isinstance(warn_data, WarnData): + if warn_data.value > self.critical_if_above: + messages.append( + WarnMessage(WarnLevel.HIGH, warn_data.date, warn_data.message).formatted_message()) + elif warn_data.value > self.warn_if_above: + messages.append( + WarnMessage(WarnLevel.NORMAL, warn_data.date, warn_data.message).formatted_message()) + else: + raise ValueError() + + return messages @abstractmethod - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage: """ - Check the given data for problems. - Return a WarnLevel indicating how serious the problems are. + Calculate warn value from given data and return as part of `WarnData` object. - If there are no problems, return `WarnLevel.NONE`. + If the value can't be calculated, directly return a `WarnMessage` object. + + @return Warn value; Message for normal warning; Message for critical warning """ raise ValueError('Subclasses must implement this') @@ -105,6 +179,8 @@ class Logger(ABC): def get_datasets(self, num: int) -> list[list[any]]: """ Returns the last `num` datasets (including the current dataset). + + The first row is the oldest, and the latest row is the current dataset. """ if not self.logged and self.has_data(): # We will append the current data manually. @@ -130,18 +206,15 @@ class Logger(ABC): return data - def log(self, skip_if_no_data: bool = False) -> None: + def log(self) -> None: """ Appends the current data (e.g. temperature of a sensor) to a logfile. - :param skip_if_no_data: Can be used to do nothing if no data is available. If one is sure to have called update() previously, this can be set to True. :raise Exception: If method is called but no data is available. Please do call update() first to avoid this! """ if self.logged: return - if skip_if_no_data and not self.has_data(): - return csv_util.write(file=self.get_log_file(), rows=[self.get_data_as_row()], header=self.data_schema()) self.logged = True @@ -151,12 +224,10 @@ class Logger(ABC): self.logged = False @abstractmethod - def read_data(self) -> list[any] | None: + def read_data(self) -> list[any]: """ Collects current data (e.g. temperature of a sensor). - Might return None if sensor is detached / not available. - :raise LoggerReadEx: """ raise ValueError('Subclasses must implement this') @@ -199,7 +270,7 @@ class Logger(ABC): ] def has_data(self) -> bool: - return self.data is not None + return len(self.data) > 0 def get_data(self) -> list[any]: """ @@ -210,7 +281,9 @@ class Logger(ABC): else: raise ValueError(f'Data has not yet been read. {self.__str__()}') - def set_data(self, data: list[any] | None): + def set_data(self, data: list[any]): + if len(data) < 1: + raise ValueError() self.data = data def data_type_strs(self, data_schema: list[str] = None) -> list[str]: diff --git a/src/de/p1st/monitor/loggers/cpu.py b/src/de/p1st/monitor/loggers/cpu.py index fc5ec71..88a3d7b 100644 --- a/src/de/p1st/monitor/loggers/cpu.py +++ b/src/de/p1st/monitor/loggers/cpu.py @@ -7,7 +7,7 @@ import psutil from de.p1st.monitor import datetime_util from de.p1st.monitor.logger import Logger -from de.p1st.monitor.warn import WarnMessage, WarnLevel +from de.p1st.monitor.warn_data import WarnData class CPULogger(Logger): @@ -24,17 +24,12 @@ class CPULogger(Logger): critical_if_above) self.warn_if_above = warn_if_above - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData: load_avg = data[1] message = f'CPU load avg of last {self.get_load_timespan()} minutes is at {load_avg}' + return WarnData(data[0], load_avg, message) - if load_avg > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if load_avg > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) - - def read_data(self) -> list[any] | None: + def read_data(self) -> list[any]: return [ datetime_util.now(), self.get_load(self.get_load_timespan()) @@ -50,14 +45,13 @@ class CPULogger(Logger): return self.get_log_dir() / f'cpu_{self.get_load_timespan()}min.csv' @abstractmethod - def get_load_timespan(self) -> Literal[1, 5, 15]: + def get_load_timespan(self) -> Literal[1, 5, 15]: raise ValueError('Subclasses must implement this') # # HELPERS # - @staticmethod def get_load(minutes: Literal[1, 5, 15]) -> float: """ @@ -79,12 +73,17 @@ class CPULogger(Logger): # This value has been tested to be correct on my AMD Ryzen 4800H CPU. return psutil.getloadavg()[idx] / psutil.cpu_count() + class CPULogger1(CPULogger): def get_load_timespan(self) -> Literal[1, 5, 15]: return 1 + + class CPULogger5(CPULogger): def get_load_timespan(self) -> Literal[1, 5, 15]: return 5 + + class CPULogger15(CPULogger): def get_load_timespan(self) -> Literal[1, 5, 15]: return 15 diff --git a/src/de/p1st/monitor/loggers/drive.py b/src/de/p1st/monitor/loggers/drive.py index 4627869..9ad24ea 100644 --- a/src/de/p1st/monitor/loggers/drive.py +++ b/src/de/p1st/monitor/loggers/drive.py @@ -6,11 +6,13 @@ from de.p1st.monitor import datetime_util from de.p1st.monitor.exec_capture import execute_capture from de.p1st.monitor.logger import Logger from de.p1st.monitor.logger_ex import LoggerArgEx, LoggerReadEx -from de.p1st.monitor.warn import WarnLevel, WarnMessage +from de.p1st.monitor.warn_data import WarnData + class BlkidException(Exception): pass + class DriveLogger(Logger): def __init__(self, uuid: str = None, device: Path = None, @@ -44,17 +46,12 @@ class DriveLogger(Logger): self.warn_if_above = warn_if_above - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData: temp = data[1] message = f'Temperature of {self.uuid} ist at {temp}' + return WarnData(data[0], temp, message) - if temp > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if temp > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) - - def read_data(self) -> list[any]: + def read_data(self) -> list[any]: return [ datetime_util.now(), self.get_temp_from_device(self.device), diff --git a/src/de/p1st/monitor/loggers/filesystem.py b/src/de/p1st/monitor/loggers/filesystem.py index 28eed57..76a4263 100644 --- a/src/de/p1st/monitor/loggers/filesystem.py +++ b/src/de/p1st/monitor/loggers/filesystem.py @@ -1,12 +1,14 @@ +import math from pathlib import Path import psutil -from de.p1st.monitor import datetime_util +from de.p1st.monitor import datetime_util, csv_util from de.p1st.monitor.exec_capture import execute_capture from de.p1st.monitor.logger import Logger from de.p1st.monitor.logger_ex import LoggerArgEx, LoggerReadEx from de.p1st.monitor.warn import WarnLevel, WarnMessage +from de.p1st.monitor.warn_data import WarnData class NotMounted(Exception): @@ -81,22 +83,37 @@ class FilesystemLogger(Logger): else: raise LoggerReadEx(getattr(e, 'message', e)) - def check_data(self, data: list[any]) -> WarnMessage: - if not self.mounted: - return WarnMessage(WarnLevel.NONE) + def export_data(self) -> Path: + data = self.get_all_datasets() + export_schema = self.data_schema() + + # Filter rows where `Disk usage` is NAN. + export_data = [row for row in data + if not math.isnan(row[1])] + + export_file = self.get_log_file().parent.joinpath(self.get_log_file().name + '.exported.csv') + rows = [self.as_row(export_row, export_schema) for export_row in export_data] + csv_util.write(file=export_file, rows=rows, header=export_schema, recreate_file=True) + return export_file + + def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage: disk_usage = data[1] + if math.isnan(disk_usage): + if self.unmounted_ok: + return WarnMessage(WarnLevel.NONE) + else: + return WarnMessage(WarnLevel.HIGH, data[0], 'Disk is not mounted') + message = f'Disk usage of {self.uuid} ist at {disk_usage}' + return WarnData(data[0], disk_usage, message) - if disk_usage > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if disk_usage > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) - - def read_data(self) -> list[any] | None: + def read_data(self) -> list[any]: if not self.mounted: - return None + return [ + datetime_util.now(), + float('nan') + ] disk_usage: float = self.get_disk_usage(self.mountpoint) return [ @@ -105,6 +122,10 @@ class FilesystemLogger(Logger): ] def data_schema(self) -> list[str]: + """ + If no disk usage value could be read (if a disk was unmounted), + NAN is stored as `Disk usage`. + """ return ['datetime#Date', 'float#Disk usage'] def get_log_file(self) -> Path: diff --git a/src/de/p1st/monitor/loggers/memory.py b/src/de/p1st/monitor/loggers/memory.py index 9135438..dfeb8cf 100644 --- a/src/de/p1st/monitor/loggers/memory.py +++ b/src/de/p1st/monitor/loggers/memory.py @@ -4,7 +4,7 @@ import psutil from de.p1st.monitor import datetime_util from de.p1st.monitor.logger import Logger -from de.p1st.monitor.warn import WarnMessage, WarnLevel +from de.p1st.monitor.warn_data import WarnData class MemoryLogger(Logger): @@ -13,7 +13,6 @@ class MemoryLogger(Logger): warn_threshold: int = 1, warn_data_range: int = 1, ): - # The space between memory is at `self.warn_if_above` and memory is full at `1.0`. buffer = 1 - warn_if_above critical_if_above = warn_if_above + 0.5 * buffer @@ -23,20 +22,15 @@ class MemoryLogger(Logger): critical_if_above) self.warn_if_above = warn_if_above - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData: used_mb = data[1] total_available_mb = data[3] message = f'Memory usage ist at {used_mb} MB of {total_available_mb} MB' used = used_mb / total_available_mb + return WarnData(data[0], used, message) - if used > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if used > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) - - def read_data(self) -> list[any]: + def read_data(self) -> list[any]: used_mb, free_mb, available_mb, total_mb = self.get_memory() used_and_cached_mb = total_mb - free_mb total_available_mb = used_mb + available_mb @@ -48,7 +42,8 @@ class MemoryLogger(Logger): ] def data_schema(self) -> list[str]: - return ['datetime#Date', 'int#Used memory in MB', 'int#Used and cached in MB', 'int#Total available memory in MB'] + return ['datetime#Date', 'int#Used memory in MB', 'int#Used and cached in MB', + 'int#Total available memory in MB'] def get_log_file(self) -> Path: return self.get_log_dir() / f'memory.csv' diff --git a/src/de/p1st/monitor/loggers/network.py b/src/de/p1st/monitor/loggers/network.py index 2d6c860..90dc598 100644 --- a/src/de/p1st/monitor/loggers/network.py +++ b/src/de/p1st/monitor/loggers/network.py @@ -54,7 +54,7 @@ class NetworkLogger(Logger): csv_util.write(file=export_file, rows=rows, header=export_schema, recreate_file=True) return export_file - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnMessage: return WarnMessage(WarnLevel.NONE) def data_schema(self) -> list[str]: @@ -65,7 +65,7 @@ class NetworkLogger(Logger): 'datetime#Boot date', ] - def read_data(self) -> list[any]: + def read_data(self) -> list[any]: sent, received = self.get_net_usage() return [ datetime_util.now(), @@ -101,6 +101,7 @@ class NetworkLogger(Logger): epoch_seconds = psutil.boot_time() return datetime.fromtimestamp(epoch_seconds, tz=timezone.utc) + def test(): from de.p1st.monitor.cfg.singleton import init_cfg init_cfg() @@ -109,5 +110,6 @@ def test(): logger.update() logger.log() + if __name__ == '__main__': test() diff --git a/src/de/p1st/monitor/loggers/swap.py b/src/de/p1st/monitor/loggers/swap.py index 610c241..4fe7333 100644 --- a/src/de/p1st/monitor/loggers/swap.py +++ b/src/de/p1st/monitor/loggers/swap.py @@ -5,6 +5,7 @@ from de.p1st.monitor import datetime_util from de.p1st.monitor.logger import Logger from de.p1st.monitor.warn import WarnMessage, WarnLevel +from de.p1st.monitor.warn_data import WarnData class SwapLogger(Logger): @@ -13,7 +14,6 @@ class SwapLogger(Logger): warn_threshold: int = 1, warn_data_range: int = 1, ): - # The space between swap is at `self.warn_if_above` and swap is full at `1.0`. buffer = 1 - warn_if_above critical_if_above = warn_if_above + 0.5 * buffer @@ -23,21 +23,16 @@ class SwapLogger(Logger): critical_if_above) self.warn_if_above = warn_if_above - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData | WarnMessage: used_mb = data[1] total_mb = data[2] message = f'Swap usage ist at {used_mb} MB of {total_mb} MB' if used_mb == 0 and total_mb == 0: - return WarnMessage(WarnLevel.HIGH, 'Swap is not enabled') + return WarnMessage(WarnLevel.HIGH, data[0], 'Swap is not enabled') usage = used_mb / total_mb - - if usage > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if usage > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) + return WarnData(data[0], usage, message) def read_data(self) -> list[any]: used_mb, total_mb = self.get_swap() diff --git a/src/de/p1st/monitor/loggers/temp.py b/src/de/p1st/monitor/loggers/temp.py index de4ff59..0efa033 100644 --- a/src/de/p1st/monitor/loggers/temp.py +++ b/src/de/p1st/monitor/loggers/temp.py @@ -5,7 +5,7 @@ import psutil from de.p1st.monitor import datetime_util from de.p1st.monitor.logger import Logger from de.p1st.monitor.logger_ex import LoggerReadEx -from de.p1st.monitor.warn import WarnMessage, WarnLevel +from de.p1st.monitor.warn_data import WarnData class TempLogger(Logger): @@ -26,15 +26,10 @@ class TempLogger(Logger): self.warn_if_above = warn_if_above - def check_data(self, data: list[any]) -> WarnMessage: + def get_warn_data(self, data: list[any]) -> WarnData: temp = data[1] message = f'Temperature of {self.name} {self.label} ist at {temp}' - - if temp > self.critical_if_above: - return WarnMessage(WarnLevel.HIGH, message) - if temp > self.warn_if_above: - return WarnMessage(WarnLevel.NORMAL, message) - return WarnMessage(WarnLevel.NONE) + return WarnData(data[0], temp, message) def read_data(self) -> list[any]: return [ @@ -60,7 +55,7 @@ class TempLogger(Logger): :return: Temperature in celsius """ data = psutil.sensors_temperatures(fahrenheit=False) - if not self.name in data: + if self.name not in data: raise LoggerReadEx(f'Sensor {self.name} not found') for i in data[self.name]: if i.label == self.label: diff --git a/src/de/p1st/monitor/main.py b/src/de/p1st/monitor/main.py index df05697..e130577 100755 --- a/src/de/p1st/monitor/main.py +++ b/src/de/p1st/monitor/main.py @@ -54,9 +54,7 @@ def log(): except LoggerReadEx as e: logger_read_exs.append(e) continue - # After logger.update() there might still be no data - # Example: FilesystemLogger if partition is not mounted (and unmounted_ok is True) - logger.log(skip_if_no_data=True) + logger.log() if logger.check().print().is_warning(): logger_warnings += 1 diff --git a/src/de/p1st/monitor/string_conversion.py b/src/de/p1st/monitor/string_conversion.py index 88edb6a..4bae2ca 100644 --- a/src/de/p1st/monitor/string_conversion.py +++ b/src/de/p1st/monitor/string_conversion.py @@ -17,8 +17,10 @@ def data_types() -> dict[str, dict[str, Callable[[any], any]]]: 'datetime': {'to': datetime_util.to_str, 'from': datetime_util.from_str}, } + def to_string(v: any, type_str: str) -> str: return data_types()[type_str]['to'](v) + def from_string(v: str, type_str: str) -> any: return data_types()[type_str]['from'](v) diff --git a/src/de/p1st/monitor/warn.py b/src/de/p1st/monitor/warn.py index 0a8648e..dda402a 100644 --- a/src/de/p1st/monitor/warn.py +++ b/src/de/p1st/monitor/warn.py @@ -1,7 +1,9 @@ from __future__ import annotations import sys +from datetime import datetime from enum import Enum from functools import total_ordering +from typing import TextIO # https://docs.python.org/3/library/functools.html#functools.total_ordering @@ -24,26 +26,54 @@ class WarnLevel(Enum): class WarnMessage: - def __init__(self, level: WarnLevel, message: str = None): + def __init__(self, level: WarnLevel, date: datetime = None, message: str = None): + """ + @param level: + @param date: Required, except if `WarnLevel.NONE` given. + @param message: Required, except if `WarnLevel.NONE` given. + """ self.level = level + self.date = date self.message = message + if self.level != WarnLevel.NONE: + if self.date is None: + raise ValueError() + if self.message is None: + raise ValueError() + if self.level == WarnLevel.NONE: + if self.date is not None: + raise ValueError() + if self.message is not None: + raise ValueError() + def is_warning(self) -> bool: return self.level > WarnLevel.NONE - def print(self, default_message: str = 'Warning!') -> WarnMessage: + def print(self) -> WarnMessage: """ return: self """ - message = default_message if self.message is None else self.message - if self.level == WarnLevel.NONE: pass - elif self.level == WarnLevel.LOW: - print(message) - elif self.level == WarnLevel.NORMAL: - print(message, file=sys.stderr) - elif self.level == WarnLevel.HIGH: - print(f'[CRITICAL] {message}', file=sys.stderr) + else: + print(self.formatted_message(), file=self.file()) return self + + def formatted_message(self) -> str: + return self.prefix() + self.message + + def prefix(self) -> str: + if self.level == WarnLevel.NONE: + raise ValueError() + if self.level > WarnLevel.HIGH: + return f'[CRITICAL] {self.date}: ' + return f'{self.date}:' + + def file(self) -> TextIO: + if self.level == WarnLevel.NONE: + raise ValueError() + if self.level >= WarnLevel.NORMAL: + return sys.stderr + return sys.stdout diff --git a/src/de/p1st/monitor/warn_data.py b/src/de/p1st/monitor/warn_data.py new file mode 100644 index 0000000..8fd8a1a --- /dev/null +++ b/src/de/p1st/monitor/warn_data.py @@ -0,0 +1,8 @@ +from typing import NamedTuple +from datetime import datetime + + +class WarnData(NamedTuple): + date: datetime + value: int | float + message: str