mirror of
https://github.com/unixtensor/proxmox-ntfy.git
synced 2025-06-27 23:28:05 +00:00
Everything is split into modules and put into classes where inheritance is needed
This commit is contained in:
10
src/cli.py
Normal file
10
src/cli.py
Normal file
@ -0,0 +1,10 @@
|
||||
import argparse
|
||||
|
||||
def interface():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--server", required=True, help="")
|
||||
parser.add_argument("--cpu-temp-critical", type=int, default=80, help="cpu tempature crtitical. default = 80")
|
||||
parser.add_argument("--cpu-temp-warning", type=int, default=70, help="cpu tempature warning. default = 70")
|
||||
parser.add_argument("--update-rate", type=int, default=1, help="how often updates happen in seconds. default = 1")
|
||||
|
||||
return parser.parse_args()
|
45
src/cpu.py
45
src/cpu.py
@ -1,26 +1,35 @@
|
||||
import subprocess
|
||||
import main
|
||||
import ntfy
|
||||
import time
|
||||
import re
|
||||
|
||||
from typing import Optional
|
||||
from ntfy import Ntfy
|
||||
|
||||
last_cpu_check: float = time.time()
|
||||
last_cpu_check_unix: float = time.time()
|
||||
last_cpu_check: int = 60 # Seconds
|
||||
|
||||
def temperature() -> Optional[float]:
|
||||
sensors_out = subprocess.check_output(["sensors"]).decode()
|
||||
for line in sensors_out.splitlines():
|
||||
if "Tctl" in line:
|
||||
match = re.search(r"(\d+\.\d+)°C", line)
|
||||
if match:
|
||||
return float(match.group(1))
|
||||
return None
|
||||
class Tempature:
|
||||
cpu_temp_crtitical_message: str = "🔥 CPU tempature is at critical tempatures!"
|
||||
cpu_temp_warning_message: str = "🌡️ CPU tempature is at a high tempature."
|
||||
|
||||
def temperature_check():
|
||||
cpu_temp = temperature()
|
||||
if cpu_temp and (time.time() - last_cpu_check) > main.last_check_debounce * 1000:
|
||||
if cpu_temp >= main.cpu_critical_temp:
|
||||
ntfy.send(f"🔥 CPU tempature is at critical tempatures! {cpu_temp}")
|
||||
elif cpu_temp >= main.cpu_warning_temp:
|
||||
ntfy.send(f"🌡️ CPU tempature is at a high tempature. {cpu_temp}")
|
||||
def __init__(self, ntfy_instance: Ntfy, cpu_critical_temp: int, cpu_warning_temp: int):
|
||||
self.cpu_critical_temp = cpu_critical_temp
|
||||
self.cpu_warning_temp = cpu_warning_temp
|
||||
self.ntfy = ntfy_instance
|
||||
|
||||
def get(self) -> Optional[float]:
|
||||
sensors_out = subprocess.check_output(["sensors"]).decode()
|
||||
for line in sensors_out.splitlines():
|
||||
if "Tctl" in line:
|
||||
match = re.search(r"(\d+\.\d+)°C", line)
|
||||
if match:
|
||||
return float(match.group(1))
|
||||
return None
|
||||
|
||||
def ntfy_check(self):
|
||||
cpu_temp = self.get()
|
||||
if cpu_temp and (time.time() - last_cpu_check_unix) > last_cpu_check * 1000:
|
||||
if cpu_temp >= self.cpu_critical_temp:
|
||||
self.ntfy.send(f"{Tempature.cpu_temp_crtitical_message} {cpu_temp}")
|
||||
elif cpu_temp >= self.cpu_warning_temp:
|
||||
self.ntfy.send(f"{Tempature.cpu_temp_warning_message} {cpu_temp}")
|
||||
|
43
src/main.py
43
src/main.py
@ -1,34 +1,33 @@
|
||||
import argparse
|
||||
import time
|
||||
|
||||
import package
|
||||
import cli
|
||||
import cpu
|
||||
|
||||
from typing import Optional
|
||||
from ntfy import Ntfy
|
||||
|
||||
clock_interval_secs: int = 1
|
||||
last_check_debounce: int = 60 # Seconds
|
||||
cpu_critical_temp: int = 80
|
||||
cpu_warning_temp: int = 70
|
||||
def start(
|
||||
cpu_critical_temp: int,
|
||||
cpu_warning_temp: int,
|
||||
ntfy_server: str,
|
||||
interval: int
|
||||
):
|
||||
ntfy = Ntfy(ntfy_server)
|
||||
ntfy_cpu_temp_monitor = cpu.Tempature(ntfy, cpu_critical_temp, cpu_warning_temp)
|
||||
|
||||
ntfy_url: Optional[str] = None
|
||||
_ntfy_configure_prompt = """Please configure an ntfy url before starting.
|
||||
\033[4mExamples:\033[0m
|
||||
\033[32mpython3 main.py --url=10.0.13.37:42069
|
||||
python3 main.py --url=ntfy.domain.com\033[0m"""
|
||||
print(f"Started. {time.time()}")
|
||||
print("Ntfy monitoring software is now listening.")
|
||||
|
||||
def cli_interface() -> bool:
|
||||
...
|
||||
|
||||
def start():
|
||||
print(f"Working! {time.time()}")
|
||||
while True:
|
||||
cpu.temperature_check()
|
||||
time.sleep(clock_interval_secs)
|
||||
ntfy_cpu_temp_monitor.ntfy_check()
|
||||
time.sleep(interval)
|
||||
|
||||
if __name__ == "__main__":
|
||||
if package.installed_list(["lm-sensors", "ntfy", "curl"]):
|
||||
if cli_interface():
|
||||
start()
|
||||
else:
|
||||
print(_ntfy_configure_prompt)
|
||||
cli_args = cli.interface()
|
||||
start(
|
||||
cli_args.cpu_temp_critical,
|
||||
cli_args.cpu_temp_warning,
|
||||
cli_args.server,
|
||||
cli_args.update_rate,
|
||||
)
|
21
src/ntfy.py
21
src/ntfy.py
@ -1,16 +1,11 @@
|
||||
import subprocess
|
||||
import main
|
||||
import time
|
||||
|
||||
last_ntfy_send: float = time.time()
|
||||
class Ntfy:
|
||||
def __init__(self, server: str):
|
||||
self.server = server
|
||||
|
||||
def send(message: str):
|
||||
try:
|
||||
if main.ntfy_url:
|
||||
global last_ntfy_send
|
||||
last_ntfy_send = time.time()
|
||||
subprocess.run(["curl", "-d", f"\"{message}\"", main.ntfy_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
else:
|
||||
print("Ntfy send: url is not configured.")
|
||||
except Exception as err:
|
||||
print(f"Ntfy failed. {err}")
|
||||
def send(self, message: str):
|
||||
try:
|
||||
subprocess.run(["curl", "-d", f"\"{message}\"", self.server], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
except Exception as err:
|
||||
print(f"Ntfy failed. {err}")
|
||||
|
Reference in New Issue
Block a user