From e0a3dd42f4655749244175f3ebcdfde2a6af8fea Mon Sep 17 00:00:00 2001 From: Alpinux Date: Wed, 6 May 2026 14:19:01 +0200 Subject: [PATCH] ASN lookup : fallback ipinfo.io + pas de cache pour AS inconnus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Si ip-api.com ne retourne pas d'AS, on retente sur ipinfo.io (champ org). Les entrées sans ASN ne sont plus mises en cache (AND asn != '' sur les lectures), donc les IP inconnues sont automatiquement re-tentées au prochain chargement du tab Bannis. Co-Authored-By: Claude Sonnet 4.6 --- app/app.py | 81 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/app/app.py b/app/app.py index 38bbb0b..8071d15 100644 --- a/app/app.py +++ b/app/app.py @@ -679,15 +679,25 @@ def _pg(): try: _pg_pool_obj.putconn(conn) except: pass +def _parse_as_field(as_field: str, country: str = "") -> dict: + """Parse 'AS12345 Name' string into {asn, name, country}.""" + asn, name = "", as_field + if as_field.startswith("AS"): + parts = as_field.split(" ", 1) + asn = parts[0][2:] + name = parts[1] if len(parts) > 1 else "" + return {"asn": asn, "name": name, "country": country} + def _lookup_ip_asn(ip: str) -> dict: - """Returns {asn, name, country} — PostgreSQL cache (30 j) puis ip-api.com.""" + """Returns {asn, name, country} — PostgreSQL cache (30 j, asn non vide), ip-api.com, puis ipinfo.io.""" + # Cache : on ignore les entrées avec asn vide pour forcer un re-essai with _pg() as conn: if conn: try: with conn.cursor() as cur: cur.execute( "SELECT asn, name, country FROM ip_asn_cache " - "WHERE ip = %s AND fetched_at > now() - interval '30 days'", + "WHERE ip = %s AND asn != '' AND fetched_at > now() - interval '30 days'", (ip,) ) row = cur.fetchone() @@ -697,37 +707,52 @@ def _lookup_ip_asn(ip: str) -> dict: try: conn.rollback() except: pass - url = f"http://ip-api.com/json/{ip}?fields=as,countryCode" result: dict = {"asn": "", "name": "", "country": ""} + + # Première tentative : ip-api.com try: - req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"}) + req = urllib.request.Request( + f"http://ip-api.com/json/{ip}?fields=as,countryCode", + headers={"User-Agent": "alpinux-static/1.0"}) with urllib.request.urlopen(req, timeout=5) as resp: d = json.loads(resp.read()) as_field = d.get("as", "") - asn, name = "", as_field - if as_field.startswith("AS"): - parts = as_field.split(" ", 1) - asn = parts[0][2:] - name = parts[1] if len(parts) > 1 else "" - result = {"asn": asn, "name": name, "country": d.get("countryCode", "")} - except Exception as e: - result["error"] = str(e) + if as_field: + result = _parse_as_field(as_field, d.get("countryCode", "")) + except Exception: + pass - with _pg() as conn: - if conn: - try: - with conn.cursor() as cur: - cur.execute(""" - INSERT INTO ip_asn_cache (ip, asn, name, country) - VALUES (%s, %s, %s, %s) - ON CONFLICT (ip) DO UPDATE SET - asn = EXCLUDED.asn, name = EXCLUDED.name, - country = EXCLUDED.country, fetched_at = now() - """, (ip, result["asn"], result["name"], result["country"])) - conn.commit() - except Exception: - try: conn.rollback() - except: pass + # Fallback : ipinfo.io si toujours pas d'AS + if not result.get("asn"): + try: + req = urllib.request.Request( + f"https://ipinfo.io/{ip}/json", + headers={"User-Agent": "alpinux-static/1.0", "Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=5) as resp: + d = json.loads(resp.read()) + org = d.get("org", "") + if org: + result = _parse_as_field(org, d.get("country", "")) + except Exception: + pass + + # Mise en cache seulement si on a trouvé un AS + if result.get("asn"): + with _pg() as conn: + if conn: + try: + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO ip_asn_cache (ip, asn, name, country) + VALUES (%s, %s, %s, %s) + ON CONFLICT (ip) DO UPDATE SET + asn = EXCLUDED.asn, name = EXCLUDED.name, + country = EXCLUDED.country, fetched_at = now() + """, (ip, result["asn"], result["name"], result["country"])) + conn.commit() + except Exception: + try: conn.rollback() + except: pass return result @@ -744,7 +769,7 @@ def _batch_lookup_ip_asn(ips: list) -> dict: with conn.cursor() as cur: cur.execute( "SELECT ip, asn, name, country FROM ip_asn_cache " - "WHERE ip = ANY(%s) AND fetched_at > now() - interval '30 days'", + "WHERE ip = ANY(%s) AND asn != '' AND fetched_at > now() - interval '30 days'", (unique,) ) for row in cur.fetchall():