Files
dns_bird/dnsmasq_tail.py

101 lines
2.1 KiB
Python

import re
import time
import sqlite3
from geoip2.database import Reader
#LOG_FILE = "/var/log/dnsmasq.log"
#DB_FILE = "dnsmasq.db"
#GEOIP_DB = "/usr/share/GeoIP/GeoLite2-Country.mmdb"
LOG_FILE = "/var/log/dnsmasq_q.log"
DB_FILE = "/etc/bird/dnsmasq.db"
GEOIP_DB = "/usr/share/GeoIP/GeoLite2-Country.mmdb"
EXCLUDE_DOMAINS = (
"kapka.ru",
"sviridovs.ru",
)
def is_excluded(domain: str) -> bool:
for d in EXCLUDE_DOMAINS:
if domain == d or domain.endswith("." + d):
return True
return False
regex = re.compile(
r'(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).*reply\s+(?P<domain>\S+)\s+is\s+(?P<ip>\d+\.\d+\.\d+\.\d+)'
)
# --- DB ---
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS dns_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT,
domain TEXT,
ip TEXT UNIQUE,
country TEXT
)
""")
conn.commit()
# --- GeoIP ---
geo = Reader(GEOIP_DB)
geo_cache = {}
def get_country(ip):
if ip in geo_cache:
return geo_cache[ip]
try:
resp = geo.country(ip)
country = resp.country.name or "Unknown"
except Exception:
country = "Unknown"
geo_cache[ip] = country
return country
# --- Tail -F ---
with open(LOG_FILE, "r") as f:
f.seek(0, 2)
print("▶ dnsmasq tail started...\n")
while True:
line = f.readline()
if not line:
time.sleep(0.2)
continue
m = regex.search(line)
if not m:
continue
ts = m.group("ts")
domain = m.group("domain")
ip = m.group("ip")
if is_excluded(domain):
continue
country = get_country(ip)
try:
cur.execute("""
INSERT INTO dns_log (ts, domain, ip, country)
VALUES (?, ?, ?, ?)
""", (ts, domain, ip, country))
conn.commit()
status = "NEW"
except sqlite3.IntegrityError:
status = "SKIP"
# защита от None
ts = ts or "-"
country = country or "Unknown"
domain = domain or "-"
print(f"[{status}] {ts:<15} {ip:<15} {country:<15} {domain}")