import gzip import io import ipaddress import json import os import re import shutil import subprocess import threading import time import urllib.request from contextlib import contextmanager from pathlib import Path from datetime import datetime, timedelta from urllib.parse import urlencode try: import psycopg2 import psycopg2.pool as _pg_pool_lib _HAS_PG = True except ImportError: _HAS_PG = False from PIL import Image from flask import (Flask, redirect, url_for, session, request, render_template, abort, send_from_directory, jsonify) from authlib.integrations.flask_client import OAuth from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.environ["SECRET_KEY"] app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) oauth = OAuth(app) oauth.register( name="alpid", server_metadata_url=os.environ["ALPID_DISCOVERY_URL"], client_id=os.environ["ALPID_CLIENT_ID"], client_secret=os.environ["ALPID_CLIENT_SECRET"], client_kwargs={"scope": "openid profile email"}, ) ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(",")) ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip()) ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve() TRASH_ROOT = ASSETS_ROOT / ".trash" _alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0] ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout" STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html")) STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json")) STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "") STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "") _gen_lock = threading.Lock() _gen_state = {"generating": False} try: _APP_VERSION = (Path(__file__).parent / "VERSION").read_text().strip() except Exception: _APP_VERSION = "—" _CHANGELOG_FILE = Path(__file__).parent / "CHANGELOG.md" _IGNORED_IPS_FILE = Path(__file__).parent / "ignored_ips.json" _LOG_RE = re.compile( r'(\S+) \S+ \S+ \[(\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] ' r'"[A-Z-]+ ([^\s"]+)[^"]*" (\d{3}) \d+ "([^"]*)"' ) _404_CACHE: dict = {} _404_CACHE_TS: float = 0 _404_CACHE_TTL = 300 # 5 min _AS_CACHE_DIR = Path(__file__).parent / "as_cache" _AS_CACHE_TTL = 30 * 24 * 3600 # 30 jours _BANNED_CACHE: tuple = (set(), []) _BANNED_CACHE_TS: float = 0 _BANNED_CACHE_TTL = 60 # 1 min _pg_dsn = os.environ.get("DATABASE_URL", "") _pg_pool_obj = None _pg_schema_ok = False _HIDDEN = frozenset({ ".git", "scripts", "app", ".env", ".env.example", ".gitignore", ".htaccess", ".htpasswd_stats", "standard_index.html", }) try: import cairosvg as _cairosvg HAS_CAIROSVG = True except ImportError: HAS_CAIROSVG = False IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"}) RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024}) RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"}) TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json", ".xml", ".conf", ".sh", ".robots", ".php"}) PDF_EXT = frozenset({".pdf"}) # ── Helpers ─────────────────────────────────────────────────────────── def _user(): return session.get("user") def _require_admin(): u = _user() if not u: session["next_url"] = request.url return redirect(url_for("login")) if not u.get("is_admin"): abort(403) return None def _safe_path(subpath: str) -> Path: target = (ASSETS_ROOT / subpath).resolve() if not target.is_relative_to(ASSETS_ROOT): abort(400) return target def _humansize(n: int) -> str: for unit in ("o", "Ko", "Mo", "Go"): if n < 1024: return f"{n:.0f} {unit}" n /= 1024 return f"{n:.1f} To" _EXIF_WANTED = frozenset({ "Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized", "ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash", "WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation", "XResolution", "YResolution", "ResolutionUnit", "ColorSpace", "PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo", }) def _image_meta(path: Path) -> dict: try: from PIL import ExifTags img = Image.open(path) meta = { "width": img.width, "height": img.height, "format": img.format or path.suffix.lstrip(".").upper(), "mode": img.mode, "dpi": img.info.get("dpi"), } exif = img.getexif() if exif: rows = {} for tag_id, val in exif.items(): tag = ExifTags.TAGS.get(tag_id, str(tag_id)) if tag in _EXIF_WANTED and tag != "GPSInfo": rows[tag] = str(val) sub = exif.get_ifd(0x8769) for tag_id, val in sub.items(): tag = ExifTags.TAGS.get(tag_id, str(tag_id)) if tag in _EXIF_WANTED and tag != "GPSInfo": rows[tag] = str(val) gps_ifd = exif.get_ifd(0x8825) if gps_ifd: rows["GPS"] = _parse_gps(gps_ifd) if rows: meta["exif"] = rows return meta except Exception: return {} def _parse_gps(gps_ifd: dict): try: from PIL import ExifTags def _dms(coords): d, m, s = coords return float(d) + float(m) / 60 + float(s) / 3600 lat = _dms(gps_ifd.get(2, (0, 0, 0))) lon = _dms(gps_ifd.get(4, (0, 0, 0))) latR = gps_ifd.get(1, "N") lonR = gps_ifd.get(3, "E") if latR == "S": lat = -lat if lonR == "W": lon = -lon return f"{lat:.6f}, {lon:.6f}" except Exception: return None def _backup_path(p: Path) -> Path: ts = datetime.now().strftime("%Y%m%d%H%M%S") return p.parent / f"{p.stem}_bak_{ts}{p.suffix}" def _auto_rename(p: Path) -> Path: i = 1 while True: candidate = p.parent / f"{p.stem}_{i}{p.suffix}" if not candidate.exists(): return candidate i += 1 def _trash_move(target: Path, subpath: str) -> None: rel = Path(subpath) trash_dest = TRASH_ROOT / rel trash_dest.parent.mkdir(parents=True, exist_ok=True) if trash_dest.exists(): ts = datetime.now().strftime("%Y%m%d%H%M%S") trash_dest = trash_dest.parent / f"{trash_dest.stem}_{ts}{trash_dest.suffix}" shutil.move(str(target), trash_dest) Path(str(trash_dest) + ".trashinfo").write_text(json.dumps({ "original_path": str(rel), "deleted_at": datetime.now().isoformat(timespec="seconds"), })) def _trash_restore(trash_rel: str, conflict: str = "") -> tuple: target = (TRASH_ROOT / trash_rel).resolve() if not str(target).startswith(str(TRASH_ROOT)): return False, "forbidden" if not target.is_file(): return False, "not_found" info_path = Path(str(target) + ".trashinfo") try: original = json.loads(info_path.read_text())["original_path"] except Exception: original = trash_rel dest = (ASSETS_ROOT / original).resolve() if not str(dest).startswith(str(ASSETS_ROOT)): return False, "forbidden" if dest.exists(): if not conflict: return False, "conflict" if conflict == "rename": dest = _auto_rename(dest) dest.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(target), str(dest)) if info_path.exists(): info_path.unlink() return True, str(dest.relative_to(ASSETS_ROOT)) def _trash_purge(days: int = 30) -> int: if not TRASH_ROOT.exists(): return 0 cutoff = datetime.now() - timedelta(days=days) count = 0 for info_file in list(TRASH_ROOT.rglob("*.trashinfo")): try: deleted_at = datetime.fromisoformat( json.loads(info_file.read_text())["deleted_at"] ) if deleted_at < cutoff: f = Path(str(info_file)[:-len(".trashinfo")]) if f.exists(): f.unlink() info_file.unlink() count += 1 except Exception: pass for d in sorted(TRASH_ROOT.rglob("*"), reverse=True): if d.is_dir() and d != TRASH_ROOT: try: d.rmdir() except OSError: pass return count def _trash_list() -> list: if not TRASH_ROOT.exists(): return [] entries = [] for info_file in TRASH_ROOT.rglob("*.trashinfo"): try: data = json.loads(info_file.read_text()) fpath = Path(str(info_file)[:-len(".trashinfo")]) if not fpath.exists(): info_file.unlink() continue rel = fpath.relative_to(TRASH_ROOT) ext = fpath.suffix.lower() stat = fpath.stat() entries.append({ "name": fpath.name, "path": str(rel), "original_path": data.get("original_path", str(rel)), "deleted_at": datetime.fromisoformat(data["deleted_at"]), "size": stat.st_size, "is_dir": False, "is_image": ext in IMAGE_EXT, "is_text": ext in TEXT_EXT, "is_pdf": ext in PDF_EXT, "ext": ext, }) except Exception: pass entries.sort(key=lambda e: e["deleted_at"], reverse=True) return entries def _trash_count() -> int: if not TRASH_ROOT.exists(): return 0 return sum(1 for f in TRASH_ROOT.rglob("*") if f.is_file() and not f.name.endswith(".trashinfo")) def _trash_stats() -> dict: if not TRASH_ROOT.exists(): return {"files": 0, "size": 0, "oldest": None} files, size, oldest = 0, 0, None for f in TRASH_ROOT.rglob("*"): if not f.is_file() or f.name.endswith(".trashinfo"): continue files += 1 size += f.stat().st_size info = Path(str(f) + ".trashinfo") try: dt = datetime.fromisoformat(json.loads(info.read_text())["deleted_at"]) if oldest is None or dt < oldest: oldest = dt except Exception: pass return {"files": files, "size": size, "oldest": oldest} def _folder_stats(path: Path) -> dict: files, size = 0, 0 for f in path.rglob("*"): if not f.is_file(): continue if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts): continue files += 1 size += f.stat().st_size return {"files": files, "size": size} def _parse_date(s: str): if not s: return None try: return datetime.strptime(s, "%Y-%m-%d") except ValueError: return None def _load_hits() -> dict: """Parse GoAccess JSON report → {relative_path: hit_count}.""" if not STATS_JSON.is_file(): return {} try: data = json.loads(STATS_JSON.read_text()) hits = {} for item in data.get("requests", {}).get("data", []): url = item.get("data", "").lstrip("/") count = item.get("hits", {}).get("count", 0) if url and count: hits[url] = count return hits except Exception: return {} def _entry(item: Path) -> dict: stat = item.stat() ext = item.suffix.lower() return { "name": item.name, "path": str(item.relative_to(ASSETS_ROOT)), "is_dir": item.is_dir(), "size": stat.st_size if item.is_file() else None, "mtime": datetime.fromtimestamp(stat.st_mtime), "ext": ext, "is_image": ext in IMAGE_EXT, "is_text": ext in TEXT_EXT, "is_pdf": ext in PDF_EXT, } # ── Context processor ──────────────────────────────────────────────── @app.context_processor def _inject_globals(): u = _user() return { "user": u, "trash_count": _trash_count() if u else 0, "humansize": _humansize, "app_version": _APP_VERSION, } # ── Auth ────────────────────────────────────────────────────────────── @app.route("/auth/login") def login(): return oauth.alpid.authorize_redirect(url_for("callback", _external=True)) @app.route("/auth/callback") def callback(): token = oauth.alpid.authorize_access_token() info = token.get("userinfo") or oauth.alpid.userinfo(token=token) email = info.get("email", "") groups = set(info.get("groups", [])) if groups: is_admin = bool(groups & ADMIN_GROUPS) elif ADMIN_EMAILS: is_admin = email in ADMIN_EMAILS else: is_admin = True session["user"] = { "sub": info["sub"], "name": info.get("name") or info.get("preferred_username", ""), "email": email, "is_admin": is_admin, } session["id_token"] = token.get("id_token", "") return redirect(session.pop("next_url", url_for("dashboard"))) @app.route("/auth/logout") def logout(): id_token = session.get("id_token") session.clear() params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)} if id_token: params["id_token_hint"] = id_token return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params)) # ── Dashboard ───────────────────────────────────────────────────────── @app.route("/") def dashboard(): redir = _require_admin() if redir: return redir folders = {} for item in sorted(ASSETS_ROOT.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue if item.is_dir(): folders[item.name] = _folder_stats(item) return render_template("dashboard.html", folders=folders, total_files=sum(v["files"] for v in folders.values()), total_size=sum(v["size"] for v in folders.values()), trash=_trash_stats(), ) @app.route("/changelog") def changelog(): redir = _require_admin() if redir: return redir sections = _parse_changelog() return render_template("changelog.html", sections=sections) def _parse_changelog(): """Parse CHANGELOG.md into a list of version dicts.""" try: text = _CHANGELOG_FILE.read_text() except Exception: return [] sections = [] current = None current_group = None for line in text.splitlines(): if line.startswith("## "): if current: if current_group: current["groups"].append(current_group) sections.append(current) m = re.match(r"## \[(.+?)\] — (.+)", line) current = {"version": m.group(1) if m else line[3:], "date": m.group(2) if m else "", "groups": []} current_group = None elif line.startswith("### ") and current is not None: if current_group: current["groups"].append(current_group) current_group = {"title": line[4:], "entries": []} elif line.startswith("- ") and current_group is not None: current_group["entries"].append(line[2:]) if current: if current_group: current["groups"].append(current_group) sections.append(current) return sections # ── Erreurs 404 — helpers ───────────────────────────────────────────── def _load_ignored_ips() -> set: try: return set(json.loads(_IGNORED_IPS_FILE.read_text()).get("ips", [])) except Exception: return set() def _save_ignored_ips(ips: set) -> None: _IGNORED_IPS_FILE.write_text(json.dumps({"ips": sorted(ips)}, indent=2)) def _log_files_404(days: int = 7) -> list: if not STATS_LOG_FILE: return [] log_dir = Path(STATS_LOG_FILE).parent files = [] for f in sorted(log_dir.glob("*-access.log*"), reverse=True)[:days]: files.append((f, f.name.endswith(".gz"))) return files def _parse_404s(days: int = 7) -> dict: global _404_CACHE, _404_CACHE_TS now = time.time() ign_mtime = _IGNORED_IPS_FILE.stat().st_mtime if _IGNORED_IPS_FILE.exists() else 0 if now - _404_CACHE_TS < _404_CACHE_TTL and _404_CACHE and ign_mtime <= _404_CACHE_TS: return _404_CACHE ignored = _load_ignored_ips() results: dict = {} for fpath, is_gz in _log_files_404(days): try: opener = gzip.open(fpath, "rt", errors="replace") if is_gz \ else open(fpath, errors="replace") with opener as f: for line in f: m = _LOG_RE.match(line) if not m: continue ip, dt_str, path, status, referer = m.groups() if status != "404": continue if ip in ignored: continue path = path.split("?")[0] try: dt = datetime.strptime(dt_str, "%d/%b/%Y:%H:%M:%S %z") except ValueError: continue if path not in results: results[path] = {"count": 0, "last_seen": dt, "ips": {}} r = results[path] r["count"] += 1 if dt > r["last_seen"]: r["last_seen"] = dt if ip not in r["ips"]: r["ips"][ip] = [] r["ips"][ip].append({"dt": dt, "referer": referer if referer != "-" else ""}) except Exception: continue _404_CACHE = results _404_CACHE_TS = now return results def _is_still_404(path: str) -> bool: clean = path.lstrip("/") return not (ASSETS_ROOT / clean).exists() def _invalidate_404_cache(): global _404_CACHE_TS _404_CACHE_TS = 0 # ── Bannissement fail2ban — helpers ─────────────────────────────────── def _get_banned_ips() -> tuple: """Returns (set_of_ips, list_of_networks) from fail2ban global-blacklist.""" global _BANNED_CACHE, _BANNED_CACHE_TS now = time.time() if now - _BANNED_CACHE_TS < _BANNED_CACHE_TTL: return _BANNED_CACHE try: r = subprocess.run( ["/usr/bin/sudo", "/usr/bin/fail2ban-client", "status", "global-blacklist"], capture_output=True, text=True, timeout=5 ) raw: set = set() for line in r.stdout.splitlines(): if "Banned IP list:" in line: raw = set(line.split("Banned IP list:")[1].split()) break ips, nets = set(), [] for entry in raw: if "/" in entry: try: nets.append(ipaddress.ip_network(entry, strict=False)) except ValueError: pass else: ips.add(entry) _BANNED_CACHE = (ips, nets) except Exception: _BANNED_CACHE = (set(), []) _BANNED_CACHE_TS = now return _BANNED_CACHE def _ip_is_banned(ip: str) -> bool: banned_ips, banned_nets = _get_banned_ips() if ip in banned_ips: return True try: addr = ipaddress.ip_address(ip) return any(addr in net for net in banned_nets) except ValueError: return False def _pg_init_pool() -> bool: global _pg_pool_obj if _pg_pool_obj is not None: return True if not _HAS_PG or not _pg_dsn: return False try: _pg_pool_obj = _pg_pool_lib.ThreadedConnectionPool(1, 3, _pg_dsn) return True except Exception: return False def _pg_ensure_schema(conn) -> None: global _pg_schema_ok if _pg_schema_ok: return with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS ip_asn_cache ( ip TEXT PRIMARY KEY, asn TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '', country TEXT NOT NULL DEFAULT '', fetched_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) conn.commit() _pg_schema_ok = True @contextmanager def _pg(): """Yields a psycopg2 connection from the pool, or None if unavailable.""" if not _pg_init_pool(): yield None return conn = None try: conn = _pg_pool_obj.getconn() _pg_ensure_schema(conn) except Exception: if conn: try: _pg_pool_obj.putconn(conn) except: pass yield None return try: yield conn finally: try: _pg_pool_obj.putconn(conn) except: pass def _parse_as_field(as_field: str, country: str = "") -> dict: """Parse 'AS12345 Name' string into {asn, name, country}.""" asn, name = "", as_field if as_field.startswith("AS"): parts = as_field.split(" ", 1) asn = parts[0][2:] name = parts[1] if len(parts) > 1 else "" return {"asn": asn, "name": name, "country": country} def _lookup_ip_asn(ip: str) -> dict: """Returns {asn, name, country} — PostgreSQL cache (30 j, asn non vide), ip-api.com, puis ipinfo.io.""" # Cache : on ignore les entrées avec asn vide pour forcer un re-essai with _pg() as conn: if conn: try: with conn.cursor() as cur: cur.execute( "SELECT asn, name, country FROM ip_asn_cache " "WHERE ip = %s AND asn != '' AND fetched_at > now() - interval '30 days'", (ip,) ) row = cur.fetchone() if row: return {"asn": row[0], "name": row[1], "country": row[2]} except Exception: try: conn.rollback() except: pass result: dict = {"asn": "", "name": "", "country": ""} # Première tentative : ip-api.com try: req = urllib.request.Request( f"http://ip-api.com/json/{ip}?fields=as,countryCode", headers={"User-Agent": "alpinux-static/1.0"}) with urllib.request.urlopen(req, timeout=5) as resp: d = json.loads(resp.read()) as_field = d.get("as", "") if as_field: result = _parse_as_field(as_field, d.get("countryCode", "")) except Exception: pass # Fallback 1 : Team Cymru whois (TCP port 43 — fonctionne même si HTTPS sortant bloqué) if not result.get("asn"): try: import socket as _socket s = _socket.create_connection(("whois.cymru.com", 43), timeout=8) s.sendall(f"verbose\n{ip}\n".encode()) data = b"" while True: chunk = s.recv(4096) if not chunk: break data += chunk s.close() for line in data.decode(errors="replace").splitlines(): if "|" not in line or line.startswith("AS"): continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 7 and parts[0] not in ("", "NA"): result = {"asn": parts[0], "name": parts[6], "country": parts[3]} break except Exception: pass # Fallback 2 : ipinfo.io (HTTPS — peut être bloqué côté serveur, timeout court) if not result.get("asn"): try: req = urllib.request.Request( f"https://ipinfo.io/{ip}/json", headers={"User-Agent": "alpinux-static/1.0", "Accept": "application/json"}) with urllib.request.urlopen(req, timeout=2) as resp: d = json.loads(resp.read()) org = d.get("org", "") if org: result = _parse_as_field(org, d.get("country", "")) except Exception: pass # Mise en cache seulement si on a trouvé un AS if result.get("asn"): with _pg() as conn: if conn: try: with conn.cursor() as cur: cur.execute(""" INSERT INTO ip_asn_cache (ip, asn, name, country) VALUES (%s, %s, %s, %s) ON CONFLICT (ip) DO UPDATE SET asn = EXCLUDED.asn, name = EXCLUDED.name, country = EXCLUDED.country, fetched_at = now() """, (ip, result["asn"], result["name"], result["country"])) conn.commit() except Exception: try: conn.rollback() except: pass return result def _batch_lookup_ip_asn(ips: list) -> dict: """Single SQL query for all IPs in cache. Falls back to individual lookup for misses (capped at 20).""" if not ips: return {} unique = list(dict.fromkeys(ips)) result: dict = {} with _pg() as conn: if conn: try: with conn.cursor() as cur: cur.execute( "SELECT ip, asn, name, country FROM ip_asn_cache " "WHERE ip = ANY(%s) AND asn != '' AND fetched_at > now() - interval '30 days'", (unique,) ) for row in cur.fetchall(): result[row[0]] = {"asn": row[1], "name": row[2], "country": row[3]} except Exception: try: conn.rollback() except: pass misses = [ip for ip in unique if ip not in result] for ip in misses[:20]: result[ip] = _lookup_ip_asn(ip) for ip in misses[20:]: result.setdefault(ip, {"asn": "", "name": "", "country": ""}) return result def _lookup_as_prefixes(asn: str) -> list: """Returns IPv4 CIDRs for an ASN via RIPE Stat. Cached 30 days.""" _AS_CACHE_DIR.mkdir(exist_ok=True) cache_file = _AS_CACHE_DIR / f"AS{asn}.json" now = time.time() if cache_file.exists() and now - cache_file.stat().st_mtime < _AS_CACHE_TTL: return json.loads(cache_file.read_text()).get("prefixes", []) url = f"https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS{asn}" try: req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"}) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read()) prefixes = [ p["prefix"] for p in data.get("data", {}).get("prefixes", []) if ":" not in p.get("prefix", "") ] cache_file.write_text(json.dumps({"asn": asn, "prefixes": prefixes})) return prefixes except Exception: return [] # ── Navigateur de fichiers ──────────────────────────────────────────── @app.route("/browse/") @app.route("/browse/") def browse(subpath=""): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.exists(): abort(404) if subpath: top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) if target.is_file(): return _file_preview(target, subpath) hits = _load_hits() entries = [] for item in sorted(target.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue e = _entry(item) e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None entries.append(e) entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower())) parts = Path(subpath).parts if subpath else () breadcrumb = [ {"name": p, "path": str(Path(*parts[:i + 1]))} for i, p in enumerate(parts) ] return render_template("browse.html", entries=entries, subpath=subpath, breadcrumb=breadcrumb, has_hits=STATS_JSON.is_file(), ) def _file_preview(path: Path, subpath: str): ext = path.suffix.lower() stat = path.stat() parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else "" siblings = sorted( [f for f in path.parent.iterdir() if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")], key=lambda f: f.name.lower(), ) idx = next((i for i, f in enumerate(siblings) if f == path), None) prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None ctx = dict( subpath=subpath, filename=path.name, filesize=_humansize(stat.st_size), mtime=datetime.fromtimestamp(stat.st_mtime), raw_url=url_for("raw_file", subpath=subpath), parent_path=parent, prev_path=prev_path, next_path=next_path, sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "", ext=ext, ) if ext in IMAGE_EXT: ctx["meta"] = _image_meta(path) return render_template("preview_image.html", **ctx) if ext in TEXT_EXT: content = path.read_text(errors="replace") return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx) return render_template("preview_other.html", **ctx) # ── Recherche ───────────────────────────────────────────────────────── @app.route("/search") def search(): redir = _require_admin() if redir: return redir q = request.args.get("q", "").strip() after_s = request.args.get("after", "").strip() before_s = request.args.get("before", "").strip() in_content = request.args.get("content") == "1" dt_after = _parse_date(after_s) dt_before = _parse_date(before_s) results = [] searched = bool(q or dt_after or dt_before) if searched: q_low = q.lower() for path in sorted(ASSETS_ROOT.rglob("*")): if not path.is_file(): continue parts = path.relative_to(ASSETS_ROOT).parts if any(p in _HIDDEN or p.startswith(".") for p in parts): continue stat = path.stat() mtime = datetime.fromtimestamp(stat.st_mtime) if dt_after and mtime.date() < dt_after.date(): continue if dt_before and mtime.date() > dt_before.date(): continue name_match = (not q) or q_low in path.name.lower() content_match = None if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT: try: text = path.read_text(errors="replace") idx = text.lower().find(q_low) if idx != -1: start = max(0, idx - 60) end = min(len(text), idx + len(q) + 60) snippet = text[start:end].replace("\n", " ") content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "") name_match = True except Exception: pass if not name_match: continue e = _entry(path) e["content_match"] = content_match results.append(e) return render_template("search.html", q=q, after=after_s, before=before_s, in_content=in_content, results=results, searched=searched, ) # ── Statistiques GoAccess ───────────────────────────────────────────── @app.route("/stats/") def stats(): redir = _require_admin() if redir: return redir return render_template("stats.html", has_report=STATS_FILE.is_file(), ) @app.route("/stats/report") def stats_report(): redir = _require_admin() if redir: return redir if not STATS_FILE.is_file(): abort(404) return send_from_directory(STATS_FILE.parent, STATS_FILE.name) def _do_generate(): try: if STATS_GENERATE_CMD: subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600) else: cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED", f"--output={STATS_FILE}"] if STATS_JSON: cmd.append(f"--output={STATS_JSON}") subprocess.run(cmd, timeout=600, check=False) finally: with _gen_lock: _gen_state["generating"] = False @app.route("/stats/generate", methods=["POST"]) def stats_generate(): redir = _require_admin() if redir: return redir if STATS_FILE.is_file(): return jsonify({"status": "exists"}) with _gen_lock: if _gen_state["generating"]: return jsonify({"status": "generating"}) if not STATS_LOG_FILE and not STATS_GENERATE_CMD: return jsonify({"status": "error", "message": "STATS_LOG_FILE non configuré"}), 500 _gen_state["generating"] = True threading.Thread(target=_do_generate, daemon=True).start() return jsonify({"status": "started"}) @app.route("/stats/status") def stats_status(): redir = _require_admin() if redir: return redir return jsonify({"ready": STATS_FILE.is_file(), "generating": _gen_state["generating"]}) # ── Fichiers bruts (aperçu / téléchargement) ────────────────────────── @app.route("/raw/") def raw_file(subpath): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.is_file(): abort(404) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) return send_from_directory(ASSETS_ROOT, subpath) # ── Suppression de fichiers ─────────────────────────────────────────── @app.route("/delete", methods=["POST"]) def delete_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() if not subpath: abort(400) target = _safe_path(subpath) if not target.exists(): abort(404) if target.is_dir(): abort(400) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) parent = str(Path(subpath).parent) _trash_move(target, subpath) return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse")) # ── Erreurs 404 ─────────────────────────────────────────────────────── def _filter_banned_from_entry(info: dict) -> dict | None: """Remove banned IPs from an entry dict. Returns None if no IPs remain.""" ips_ok = {ip: hits for ip, hits in info["ips"].items() if not _ip_is_banned(ip)} if not ips_ok: return None count = sum(len(h) for h in ips_ok.values()) last_seen = max(max(h["dt"] for h in hits) for hits in ips_ok.values()) return {"count": count, "last_seen": last_seen, "ips": ips_ok} @app.route("/errors/") def errors_404(): redir = _require_admin() if redir: return redir # ── Onglet Erreurs 404 ── data = _parse_404s() filtered = {} for path, info in data.items(): entry = _filter_banned_from_entry(info) if entry: filtered[path] = entry entries = sorted(filtered.items(), key=lambda x: x[1]["count"], reverse=True) ignored = _load_ignored_ips() # Count only — ASN lookup is deferred to /errors/banned-groups (AJAX) banned_ips, banned_nets = _get_banned_ips() banned_total = len(banned_ips) + len(banned_nets) return render_template("errors_404.html", entries=entries, ignored_ips=sorted(ignored), total=sum(v["count"] for v in filtered.values()), log_configured=bool(STATS_LOG_FILE), banned_total=banned_total, ) @app.route("/errors/detail") def errors_detail(): redir = _require_admin() if redir: return redir path = request.args.get("path", "") data = _parse_404s() raw_entry = data.get(path) if not raw_entry: return jsonify({"error": "introuvable"}), 404 entry = _filter_banned_from_entry(raw_entry) or {"count": 0, "last_seen": raw_entry["last_seen"], "ips": {}} ignored = _load_ignored_ips() ip_list = [] for ip, hits in sorted(entry["ips"].items(), key=lambda x: len(x[1]), reverse=True): last = max(h["dt"] for h in hits) ip_list.append({ "ip": ip, "count": len(hits), "last_seen": last.strftime("%d/%m/%Y %H:%M"), "ignored": ip in ignored, "hits": [ {"dt": h["dt"].strftime("%d/%m/%Y %H:%M"), "referer": h["referer"]} for h in sorted(hits, key=lambda x: x["dt"], reverse=True)[:30] ], }) return jsonify({ "path": path, "count": entry["count"], "still_404": _is_still_404(path), "last_seen": entry["last_seen"].strftime("%d/%m/%Y %H:%M"), "ips": ip_list, }) @app.route("/errors/asinfo") def errors_asinfo(): redir = _require_admin() if redir: return redir ip = request.args.get("ip", "").strip() if not re.match(r"^[\d\.]+$", ip): return jsonify({"error": "IP invalide"}), 400 info = _lookup_ip_asn(ip) if info.get("asn"): info["prefix_count"] = len(_lookup_as_prefixes(info["asn"])) else: info["prefix_count"] = 0 return jsonify(info) @app.route("/errors/ignore", methods=["POST"]) def errors_ignore(): redir = _require_admin() if redir: return redir ip = request.form.get("ip", "").strip() action = request.form.get("action", "add") if not re.match(r"^[\d\.a-fA-F:]+$", ip): return jsonify({"error": "IP invalide"}), 400 ips = _load_ignored_ips() ips.add(ip) if action == "add" else ips.discard(ip) _save_ignored_ips(ips) _invalidate_404_cache() return jsonify({"ok": True, "action": action, "ip": ip}) @app.route("/errors/ban", methods=["POST"]) def errors_ban(): global _BANNED_CACHE_TS redir = _require_admin() if redir: return redir ip = request.form.get("ip", "").strip() jail = request.form.get("jail", "global-blacklist") ban_as = request.form.get("ban_as") == "1" if not re.match(r"^[\d\.a-fA-F:]+$", ip): return jsonify({"error": "IP invalide"}), 400 try: if ban_as: info = _lookup_ip_asn(ip) if not info.get("asn"): return jsonify({"error": "ASN introuvable pour cette IP"}), 400 targets = _lookup_as_prefixes(info["asn"]) if not targets: return jsonify({"error": "Aucun préfixe IPv4 trouvé pour cet AS"}), 400 else: targets = [ip] r = subprocess.run( ["/usr/bin/sudo", "/usr/bin/fail2ban-client", "set", jail, "banip"] + targets, capture_output=True, text=True, timeout=60 ) if r.returncode == 0: _BANNED_CACHE_TS = 0 # invalide le cache pour ce worker return jsonify({"ok": True, "ip": ip, "jail": jail, "count": len(targets)}) return jsonify({"error": r.stderr.strip() or "Erreur fail2ban"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/errors/banned/") def errors_banned(): return redirect(url_for('errors_404') + '#banned') @app.route("/errors/banned-groups") def errors_banned_groups(): redir = _require_admin() if redir: return jsonify({"error": "not authorized"}), 403 banned_ips, banned_nets = _get_banned_ips() all_banned = sorted(banned_ips) + sorted(str(n) for n in banned_nets) # ── Étape 1 : reverse-index depuis les fichiers RIPE Stat locaux ── # Les CIDRs bannis via "Ban AS" sont tous dans as_cache/AS*.json → zéro appel API prefix_to_asn: dict = {} if _AS_CACHE_DIR.exists(): for cf in _AS_CACHE_DIR.glob("AS*.json"): try: d = json.loads(cf.read_text()) asn = str(d.get("asn", cf.stem[2:])) for p in d.get("prefixes", []): prefix_to_asn[p] = asn except Exception: pass # ── Étape 2 : classer chaque entrée : connue (prefix cache) ou à chercher ── entry_asn: dict = {} # entry → asn string known_asns: set = set() needs_api: list = [] # entries non trouvées dans prefix cache for entry in all_banned: if entry in prefix_to_asn: asn = prefix_to_asn[entry] entry_asn[entry] = asn known_asns.add(asn) else: needs_api.append(entry) # ── Étape 3 : nom/pays des AS connus — 1 seule requête PostgreSQL ── asn_details: dict = {} # asn → {name, country} if known_asns: with _pg() as conn: if conn: try: with conn.cursor() as cur: cur.execute( "SELECT DISTINCT ON (asn) asn, name, country FROM ip_asn_cache " "WHERE asn = ANY(%s) ORDER BY asn, fetched_at DESC", (list(known_asns),) ) for row in cur.fetchall(): asn_details[row[0]] = {"name": row[1], "country": row[2]} except Exception: try: conn.rollback() except: pass # ── Étape 4 : lookup API pour les IPs/CIDRs bannis individuellement ── if needs_api: rep_ips = [e.split("/")[0] for e in needs_api] ip_map = _batch_lookup_ip_asn(rep_ips) for entry, rep_ip in zip(needs_api, rep_ips): info = ip_map.get(rep_ip, {}) entry_asn[entry] = info.get("asn") or "?" if info.get("asn"): known_asns.add(info["asn"]) asn_details.setdefault(info["asn"], {"name": info.get("name", ""), "country": info.get("country", "")}) # ── Étape 5 : construire les groupes ── by_asn: dict = {} for entry in all_banned: asn = entry_asn.get(entry, "?") det = asn_details.get(asn, {}) if asn not in by_asn: by_asn[asn] = {"asn": asn if asn != "?" else "", "name": det.get("name", ""), "country": det.get("country", ""), "entries": []} by_asn[asn]["entries"].append(entry) groups = sorted(by_asn.values(), key=lambda g: len(g["entries"]), reverse=True) return jsonify({"groups": groups, "total": len(all_banned)}) @app.route("/errors/unban", methods=["POST"]) def errors_unban(): global _BANNED_CACHE_TS redir = _require_admin() if redir: return redir entries = request.form.getlist("entries") or [request.form.get("entry", "").strip()] entries = [e.strip() for e in entries if e.strip()] if not entries: return jsonify({"error": "Aucune cible spécifiée"}), 400 for e in entries: if not re.match(r"^[\d\.a-fA-F:/]+$", e): return jsonify({"error": f"Entrée invalide : {e}"}), 400 jail = "global-blacklist" unbanned, errors = [], [] for entry in entries: try: r = subprocess.run( ["/usr/bin/sudo", "/usr/bin/fail2ban-client", "set", jail, "unbanip", entry], capture_output=True, text=True, timeout=10 ) (unbanned if r.returncode == 0 else errors).append(entry) except Exception as ex: errors.append(f"{entry}: {ex}") _BANNED_CACHE_TS = 0 if errors and not unbanned: return jsonify({"error": f"Erreur sur : {', '.join(errors)}"}), 500 return jsonify({"ok": True, "unbanned": unbanned, "errors": errors, "count": len(unbanned)}) # ── Corbeille ──────────────────────────────────────────────────────── @app.route("/trash") def trash_list(): redir = _require_admin() if redir: return redir _trash_purge(30) entries = _trash_list() return render_template("trash.html", entries=entries) @app.route("/trash/raw/") def trash_raw(subpath): redir = _require_admin() if redir: return redir target = (TRASH_ROOT / subpath).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if not target.is_file(): abort(404) return send_from_directory(TRASH_ROOT, subpath) @app.route("/trash/delete", methods=["POST"]) def trash_delete(): redir = _require_admin() if redir: return redir trash_rel = request.form.get("path", "").strip() if not trash_rel: abort(400) target = (TRASH_ROOT / trash_rel).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if target.is_file(): target.unlink() info = Path(str(target) + ".trashinfo") if info.exists(): info.unlink() return redirect(url_for("trash_list")) @app.route("/trash/restore", methods=["POST"]) def trash_restore(): redir = _require_admin() if redir: return redir trash_rel = request.form.get("path", "").strip() conflict = request.form.get("conflict", "").strip() if not trash_rel: abort(400) ok, result = _trash_restore(trash_rel, conflict) if not ok: if result == "conflict": return jsonify({"conflict": True, "path": trash_rel}), 409 abort(400) parent = str(Path(result).parent) return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse")) @app.route("/trash/preview/") def trash_preview(subpath): redir = _require_admin() if redir: return redir target = (TRASH_ROOT / subpath).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if not target.is_file(): abort(404) stat = target.stat() ext = target.suffix.lower() ctx = dict( subpath = subpath, filename = target.name, filesize = _humansize(stat.st_size), mtime = datetime.fromtimestamp(stat.st_mtime), raw_url = url_for("trash_raw", subpath=subpath), parent_path= None, prev_path = None, next_path = None, sibling_pos= "", ext = ext, from_trash = True, ) if ext in IMAGE_EXT: ctx["meta"] = _image_meta(target) return render_template("preview_image.html", **ctx) if ext in TEXT_EXT: return render_template("preview_text.html", content=target.read_text(errors="replace"), lang=ext.lstrip("."), **ctx) return render_template("preview_other.html", **ctx) @app.route("/trash/empty", methods=["POST"]) def trash_empty(): redir = _require_admin() if redir: return redir if TRASH_ROOT.exists(): shutil.rmtree(TRASH_ROOT) TRASH_ROOT.mkdir(exist_ok=True) return redirect(url_for("trash_list")) # ── Renommage de fichiers ───────────────────────────────────────────── @app.route("/rename", methods=["POST"]) def rename_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() new_name = request.form.get("new_name", "").strip() if not subpath or not new_name: return jsonify({"error": "Paramètres manquants"}), 400 if "/" in new_name or "\\" in new_name or new_name in (".", ".."): return jsonify({"error": "Nom invalide"}), 400 new_name = secure_filename(new_name) if not new_name: return jsonify({"error": "Nom invalide après nettoyage"}), 400 target = _safe_path(subpath) if not target.is_file(): return jsonify({"error": "Fichier introuvable"}), 404 top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): return jsonify({"error": "Accès refusé"}), 403 dest = target.parent / new_name if not dest.is_relative_to(ASSETS_ROOT): return jsonify({"error": "Destination invalide"}), 400 if dest.exists(): return jsonify({"error": f"« {new_name} » existe déjà"}), 409 target.rename(dest) new_path = str(dest.relative_to(ASSETS_ROOT)) return jsonify({ "name": new_name, "path": new_path, "browse_url": url_for("browse", subpath=new_path), }) # ── Vérification des conflits avant upload ─────────────────────────── @app.route("/check-upload", methods=["POST"]) def check_upload(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() names = request.form.getlist("names") if subpath: parts = Path(subpath).parts if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")): abort(403) dest = _safe_path(subpath) if subpath else ASSETS_ROOT if not dest.is_dir(): abort(400) conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()] return jsonify({"conflicts": conflicts}) # ── Upload de fichiers ──────────────────────────────────────────────── @app.route("/upload", methods=["POST"]) def upload_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() files = request.files.getlist("files") if not files or all(f.filename == "" for f in files): abort(400) if subpath: parts = Path(subpath).parts if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")): abort(403) dest = _safe_path(subpath) if subpath else ASSETS_ROOT if not dest.is_dir(): abort(400) conflict = request.form.get("conflict", "overwrite") if conflict not in ("backup", "overwrite", "rename", "skip"): conflict = "overwrite" for f in files: name = secure_filename(f.filename or "") if not name: continue out_path = dest / name if out_path.exists(): if conflict == "skip": continue elif conflict == "backup": try: out_path.rename(_backup_path(out_path)) except Exception: continue elif conflict == "rename": out_path = _auto_rename(out_path) f.save(out_path) return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse")) # ── Vérification des conflits avant redimensionnement ──────────────── @app.route("/check-resize", methods=["POST"]) def check_resize(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() raw_sizes = request.form.getlist("sizes") raw_formats = request.form.getlist("formats") if not subpath: return jsonify({"conflicts": []}) target = _safe_path(subpath) if not target.is_file(): return jsonify({"conflicts": []}) try: sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES] except (ValueError, TypeError): sizes = [] formats = [f for f in raw_formats if f in RESIZE_FORMATS] src_ext = target.suffix.lstrip(".") stem = re.sub(r"_\d+x\d+$", "", target.stem) parent = target.parent all_dims = [(s, s) for s in sizes] for cs in request.form.getlist("custom_sizes"): try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w >= 1 and h >= 1: all_dims.append((w, h)) except (ValueError, TypeError): pass # Pas de dimensions sélectionnées → conserver celles d'origine if not all_dims: try: src = Image.open(target) all_dims = [(src.width, src.height)] src.close() except Exception: return jsonify({"conflicts": []}) # Pas de format sélectionné → conserver le format d'origine if not formats: formats = [src_ext] conflicts = [] for fmt in formats: for w, h in all_dims: out_name = f"{stem}_{w}x{h}.{fmt}" if (parent / out_name).exists(): conflicts.append(out_name) return jsonify({"conflicts": conflicts}) # ── Redimensionnement d'images ──────────────────────────────────────── @app.route("/resize", methods=["POST"]) def resize_image(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() raw_sizes = request.form.getlist("sizes") raw_formats = request.form.getlist("formats") raw_customs = request.form.getlist("custom_sizes") if not subpath or not raw_formats: return jsonify({"error": "Paramètres manquants"}), 400 target = _safe_path(subpath) if not target.is_file(): abort(404) ext = target.suffix.lower() if ext not in IMAGE_EXT: abort(400) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) try: square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})] except (ValueError, TypeError): square_dims = [] formats = [f for f in raw_formats if f in RESIZE_FORMATS] # Dimensions libres — validées contre la résolution source src_img = Image.open(target) max_w, max_h = src_img.width, src_img.height src_img.close() custom_dims = [] for cs in raw_customs: try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w < 1 or h < 1: continue if w > max_w or h > max_h: continue custom_dims.append((w, h)) except (ValueError, TypeError): pass all_dims = square_dims + custom_dims # Pas de dimensions sélectionnées → conserver celles d'origine if not all_dims: all_dims = [(max_w, max_h)] # Pas de format sélectionné → conserver le format d'origine if not formats: formats = [ext.lstrip(".")] conflict = request.form.get("conflict", "skip") if conflict not in ("backup", "overwrite", "rename", "skip"): conflict = "skip" is_svg = ext == ".svg" stem = re.sub(r"_\d+x\d+$", "", target.stem) parent = target.parent created, errors = [], [] # Signaler les dimensions hors-bornes for cs in raw_customs: try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w > max_w or h > max_h: for f in formats: errors.append({"name": f"{stem}_{w}x{h}.{f}", "reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"}) except (ValueError, TypeError): pass for fmt in formats: for w, h in all_dims: out_name = f"{stem}_{w}x{h}.{fmt}" out_path = parent / out_name out_rel = str(out_path.relative_to(ASSETS_ROOT)) if out_path.exists(): if conflict == "skip": errors.append({"name": out_name, "reason": "Fichier existant, ignoré"}) continue elif conflict == "backup": try: bak = _backup_path(out_path) out_path.rename(bak) except Exception as exc: errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"}) continue elif conflict == "rename": out_path = _auto_rename(out_path) out_name = out_path.name out_rel = str(out_path.relative_to(ASSETS_ROOT)) try: if fmt == "svg" and is_svg: shutil.copy2(target, out_path) created.append({"name": out_name, "path": out_rel}) elif fmt == "svg" and not is_svg: errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"}) elif is_svg: if not HAS_CAIROSVG: errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"}) continue if fmt in ("png", "ico"): buf = io.BytesIO() _cairosvg.svg2png(url=str(target), write_to=buf, output_width=w, output_height=h) buf.seek(0) img = Image.open(buf).convert("RGBA") if fmt == "ico": img.save(out_path, format="ICO", sizes=[(w, h)]) else: img.save(out_path, format="PNG") else: buf = io.BytesIO() _cairosvg.svg2png(url=str(target), write_to=buf, output_width=w, output_height=h) buf.seek(0) img = Image.open(buf).convert("RGB") img.save(out_path, format="JPEG", quality=90) created.append({"name": out_name, "path": out_rel}) else: img = Image.open(target) # LANCZOS ne supporte pas le mode palette (ICO, GIF…) if img.mode not in ("RGB", "RGBA", "L", "LA", "I", "F"): img = img.convert("RGBA") img = img.resize((w, h), Image.LANCZOS) if fmt == "png": if img.mode not in ("RGBA", "RGB", "L"): img = img.convert("RGBA") img.save(out_path, format="PNG") elif fmt == "jpg": img = img.convert("RGB") img.save(out_path, format="JPEG", quality=90) elif fmt == "ico": img = img.convert("RGBA") img.save(out_path, format="ICO", sizes=[(w, h)]) created.append({"name": out_name, "path": out_rel}) except Exception as exc: errors.append({"name": out_name, "reason": str(exc)}) return jsonify({"created": created, "errors": errors}) # ── Fichiers CDN publics (dev local uniquement) ─────────────────────── _PUBLIC_TOP = frozenset({"logo", "wiki", "error"}) @app.route("/favicon.ico") def favicon(): return send_from_directory(ASSETS_ROOT, "favicon.ico") @app.route("/robots.txt") def robots(): return send_from_directory(ASSETS_ROOT, "robots.txt") @app.route("/") def cdn_file(subpath): top = Path(subpath).parts[0] if top not in _PUBLIC_TOP: abort(404) target = _safe_path(subpath) if not target.is_file(): abort(404) return send_from_directory(ASSETS_ROOT, subpath) if __name__ == "__main__": app.run(debug=True, port=5003)