import re import time import sqlite3 from geoip2.database import Reader #LOG_FILE = "/var/log/dnsmasq.log" #DB_FILE = "dnsmasq.db" #GEOIP_DB = "/usr/share/GeoIP/GeoLite2-Country.mmdb" LOG_FILE = "/var/log/dnsmasq_q.log" DB_FILE = "/root/dnsmasq.db" GEOIP_DB = "/usr/share/GeoIP/GeoLite2-Country.mmdb" EXCLUDE_DOMAINS = ( "kapka.ru", "sviridovs.ru", ) def is_excluded(domain: str) -> bool: for d in EXCLUDE_DOMAINS: if domain == d or domain.endswith("." + d): return True return False regex = re.compile( r'(?P\w+\s+\d+\s+\d+:\d+:\d+).*reply\s+(?P\S+)\s+is\s+(?P\d+\.\d+\.\d+\.\d+)' ) # --- DB --- conn = sqlite3.connect(DB_FILE) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS dns_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT, domain TEXT, ip TEXT UNIQUE, country TEXT ) """) conn.commit() # --- GeoIP --- geo = Reader(GEOIP_DB) geo_cache = {} def get_country(ip): if ip in geo_cache: return geo_cache[ip] try: resp = geo.country(ip) country = resp.country.name or "Unknown" except Exception: country = "Unknown" geo_cache[ip] = country return country # --- Tail -F --- with open(LOG_FILE, "r") as f: f.seek(0, 2) print("▶ dnsmasq tail started...\n") while True: line = f.readline() if not line: time.sleep(0.2) continue m = regex.search(line) if not m: continue ts = m.group("ts") domain = m.group("domain") ip = m.group("ip") if is_excluded(domain): continue country = get_country(ip) try: cur.execute(""" INSERT INTO dns_log (ts, domain, ip, country) VALUES (?, ?, ?, ?) """, (ts, domain, ip, country)) conn.commit() status = "NEW" except sqlite3.IntegrityError: status = "SKIP" # защита от None ts = ts or "-" country = country or "Unknown" domain = domain or "-" print(f"[{status}] {ts:<15} {ip:<15} {country:<15} {domain}")