ASN lookup : fallback ipinfo.io + pas de cache pour AS inconnus

Si ip-api.com ne retourne pas d'AS, on retente sur ipinfo.io (champ org).
Les entrées sans ASN ne sont plus mises en cache (AND asn != '' sur les
lectures), donc les IP inconnues sont automatiquement re-tentées au prochain
chargement du tab Bannis.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alpinux 2026-05-06 14:19:01 +02:00
parent a01976cbe8
commit e0a3dd42f4

View file

@ -679,15 +679,25 @@ def _pg():
try: _pg_pool_obj.putconn(conn)
except: pass
def _parse_as_field(as_field: str, country: str = "") -> dict:
"""Parse 'AS12345 Name' string into {asn, name, country}."""
asn, name = "", as_field
if as_field.startswith("AS"):
parts = as_field.split(" ", 1)
asn = parts[0][2:]
name = parts[1] if len(parts) > 1 else ""
return {"asn": asn, "name": name, "country": country}
def _lookup_ip_asn(ip: str) -> dict:
"""Returns {asn, name, country} — PostgreSQL cache (30 j) puis ip-api.com."""
"""Returns {asn, name, country} — PostgreSQL cache (30 j, asn non vide), ip-api.com, puis ipinfo.io."""
# Cache : on ignore les entrées avec asn vide pour forcer un re-essai
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute(
"SELECT asn, name, country FROM ip_asn_cache "
"WHERE ip = %s AND fetched_at > now() - interval '30 days'",
"WHERE ip = %s AND asn != '' AND fetched_at > now() - interval '30 days'",
(ip,)
)
row = cur.fetchone()
@ -697,37 +707,52 @@ def _lookup_ip_asn(ip: str) -> dict:
try: conn.rollback()
except: pass
url = f"http://ip-api.com/json/{ip}?fields=as,countryCode"
result: dict = {"asn": "", "name": "", "country": ""}
# Première tentative : ip-api.com
try:
req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"})
req = urllib.request.Request(
f"http://ip-api.com/json/{ip}?fields=as,countryCode",
headers={"User-Agent": "alpinux-static/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
d = json.loads(resp.read())
as_field = d.get("as", "")
asn, name = "", as_field
if as_field.startswith("AS"):
parts = as_field.split(" ", 1)
asn = parts[0][2:]
name = parts[1] if len(parts) > 1 else ""
result = {"asn": asn, "name": name, "country": d.get("countryCode", "")}
except Exception as e:
result["error"] = str(e)
if as_field:
result = _parse_as_field(as_field, d.get("countryCode", ""))
except Exception:
pass
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ip_asn_cache (ip, asn, name, country)
VALUES (%s, %s, %s, %s)
ON CONFLICT (ip) DO UPDATE SET
asn = EXCLUDED.asn, name = EXCLUDED.name,
country = EXCLUDED.country, fetched_at = now()
""", (ip, result["asn"], result["name"], result["country"]))
conn.commit()
except Exception:
try: conn.rollback()
except: pass
# Fallback : ipinfo.io si toujours pas d'AS
if not result.get("asn"):
try:
req = urllib.request.Request(
f"https://ipinfo.io/{ip}/json",
headers={"User-Agent": "alpinux-static/1.0", "Accept": "application/json"})
with urllib.request.urlopen(req, timeout=5) as resp:
d = json.loads(resp.read())
org = d.get("org", "")
if org:
result = _parse_as_field(org, d.get("country", ""))
except Exception:
pass
# Mise en cache seulement si on a trouvé un AS
if result.get("asn"):
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ip_asn_cache (ip, asn, name, country)
VALUES (%s, %s, %s, %s)
ON CONFLICT (ip) DO UPDATE SET
asn = EXCLUDED.asn, name = EXCLUDED.name,
country = EXCLUDED.country, fetched_at = now()
""", (ip, result["asn"], result["name"], result["country"]))
conn.commit()
except Exception:
try: conn.rollback()
except: pass
return result
@ -744,7 +769,7 @@ def _batch_lookup_ip_asn(ips: list) -> dict:
with conn.cursor() as cur:
cur.execute(
"SELECT ip, asn, name, country FROM ip_asn_cache "
"WHERE ip = ANY(%s) AND fetched_at > now() - interval '30 days'",
"WHERE ip = ANY(%s) AND asn != '' AND fetched_at > now() - interval '30 days'",
(unique,)
)
for row in cur.fetchall():