#!/usr/bin/env python3 import sys import subprocess import time from datetime import datetime from collections import deque class PingMonitor: def __init__(self, host): self.host = host self.current_outage = None self.outages = deque(maxlen=100) # Ограничиваем количество хранимых серий self.last_success_time = datetime.now() def ping_host(self): try: # -c 1: один пакет, -W 1: таймаут 1 секунда (для Linux) # Для Windows нужно использовать "-n 1 -w 1000" result = subprocess.run(['ping', '-c', '1', '-W', '1', self.host], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.returncode == 0 except Exception as e: print(f"Error pinging host: {e}") return False def monitor(self): while True: success = self.ping_host() now = datetime.now() if not success and self.current_outage is None: # Начало новой серии разрывов self.current_outage = { 'start': now, 'packets_lost': 1, 'end': None } elif not success and self.current_outage is not None: # Продолжение текущей серии разрывов self.current_outage['packets_lost'] += 1 elif success and self.current_outage is not None: # Серия разрывов закончилась self.current_outage['end'] = now self.outages.append(self.current_outage) self.current_outage = None self.last_success_time = now elif success: # Успешный пинг, обновляем время последнего успеха self.last_success_time = now self.print_status() time.sleep(1) # Пауза между пингами def print_status(self): # Очищаем экран (работает в терминалах, поддерживающих ANSI коды) print("\033c", end="") print(f"Ping monitor for {self.host}\n") # Всегда показываем текущий статус if self.current_outage is None: print("Status: pinging... (connection stable)") current_duration = (datetime.now() - self.last_success_time).total_seconds() print(f"Current series: packets_lost=0 | duration={current_duration:.1f} sec | {datetime.now()}") else: current_duration = (datetime.now() - self.current_outage['start']).total_seconds() print(f"Status: CONNECTION LOST!") print(f"Current series: packets_lost={self.current_outage['packets_lost']} | " f"duration={current_duration:.1f} sec | started at {self.current_outage['start']}") # Выводим историю разрывов, если они есть if self.outages: print("\nOutage history (sorted by packets lost):") print("-" * 60) # Сортируем серии по количеству потерянных пакетов (по убыванию) sorted_outages = sorted(self.outages, key=lambda x: x['packets_lost'], reverse=True) for outage in sorted_outages: duration = (outage['end'] - outage['start']).total_seconds() print(f"Start: {outage['start']} | " f"Packets lost: {outage['packets_lost']} | " f"Duration: {duration:.1f} sec") print("\nPress Ctrl+C to exit...") def main(): if len(sys.argv) != 2: print("Usage: ./ping_monitor.py ") sys.exit(1) host = sys.argv[1] monitor = PingMonitor(host) try: monitor.monitor() except KeyboardInterrupt: print("\nMonitoring stopped.") if __name__ == "__main__": main()