alpinux-static/app/app.py
Alpinux d272b3e8b7 Bannis : résolution AS via reverse-index RIPE Stat local + fallback ipinfo.io
- CIDRs bannis via "Ban AS" : résolus depuis as_cache/*.json (0 appel API)
  + nom/pays récupérés en 1 seule requête PostgreSQL (DISTINCT ON asn)
- IPs/CIDRs bannis individuellement : _batch_lookup_ip_asn avec fallback
  ipinfo.io (résout les cas où ip-api.com retourne vide, ex: 103.51.13.0)
- Entrées avec asn='' exclues du cache (AND asn != '') → re-tentée à chaque fois

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 15:10:26 +02:00

1789 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gzip
import io
import ipaddress
import json
import os
import re
import shutil
import subprocess
import threading
import time
import urllib.request
from contextlib import contextmanager
from pathlib import Path
from datetime import datetime, timedelta
from urllib.parse import urlencode
try:
import psycopg2
import psycopg2.pool as _pg_pool_lib
_HAS_PG = True
except ImportError:
_HAS_PG = False
from PIL import Image
from flask import (Flask, redirect, url_for, session, request,
render_template, abort, send_from_directory, jsonify)
from authlib.integrations.flask_client import OAuth
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.secret_key = os.environ["SECRET_KEY"]
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
oauth = OAuth(app)
oauth.register(
name="alpid",
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
client_id=os.environ["ALPID_CLIENT_ID"],
client_secret=os.environ["ALPID_CLIENT_SECRET"],
client_kwargs={"scope": "openid profile email"},
)
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
TRASH_ROOT = ASSETS_ROOT / ".trash"
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
_gen_lock = threading.Lock()
_gen_state = {"generating": False}
try:
_APP_VERSION = (Path(__file__).parent / "VERSION").read_text().strip()
except Exception:
_APP_VERSION = ""
_CHANGELOG_FILE = Path(__file__).parent / "CHANGELOG.md"
_IGNORED_IPS_FILE = Path(__file__).parent / "ignored_ips.json"
_LOG_RE = re.compile(
r'(\S+) \S+ \S+ \[(\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] '
r'"[A-Z-]+ ([^\s"]+)[^"]*" (\d{3}) \d+ "([^"]*)"'
)
_404_CACHE: dict = {}
_404_CACHE_TS: float = 0
_404_CACHE_TTL = 300 # 5 min
_AS_CACHE_DIR = Path(__file__).parent / "as_cache"
_AS_CACHE_TTL = 30 * 24 * 3600 # 30 jours
_BANNED_CACHE: tuple = (set(), [])
_BANNED_CACHE_TS: float = 0
_BANNED_CACHE_TTL = 60 # 1 min
_pg_dsn = os.environ.get("DATABASE_URL", "")
_pg_pool_obj = None
_pg_schema_ok = False
_HIDDEN = frozenset({
".git", "scripts", "app",
".env", ".env.example", ".gitignore",
".htaccess", ".htpasswd_stats",
"standard_index.html",
})
try:
import cairosvg as _cairosvg
HAS_CAIROSVG = True
except ImportError:
HAS_CAIROSVG = False
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024})
RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"})
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
".xml", ".conf", ".sh", ".robots", ".php"})
PDF_EXT = frozenset({".pdf"})
# ── Helpers ───────────────────────────────────────────────────────────
def _user():
return session.get("user")
def _require_admin():
u = _user()
if not u:
session["next_url"] = request.url
return redirect(url_for("login"))
if not u.get("is_admin"):
abort(403)
return None
def _safe_path(subpath: str) -> Path:
target = (ASSETS_ROOT / subpath).resolve()
if not target.is_relative_to(ASSETS_ROOT):
abort(400)
return target
def _humansize(n: int) -> str:
for unit in ("o", "Ko", "Mo", "Go"):
if n < 1024:
return f"{n:.0f} {unit}"
n /= 1024
return f"{n:.1f} To"
_EXIF_WANTED = frozenset({
"Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized",
"ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash",
"WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation",
"XResolution", "YResolution", "ResolutionUnit", "ColorSpace",
"PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo",
})
def _image_meta(path: Path) -> dict:
try:
from PIL import ExifTags
img = Image.open(path)
meta = {
"width": img.width,
"height": img.height,
"format": img.format or path.suffix.lstrip(".").upper(),
"mode": img.mode,
"dpi": img.info.get("dpi"),
}
exif = img.getexif()
if exif:
rows = {}
for tag_id, val in exif.items():
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
if tag in _EXIF_WANTED and tag != "GPSInfo":
rows[tag] = str(val)
sub = exif.get_ifd(0x8769)
for tag_id, val in sub.items():
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
if tag in _EXIF_WANTED and tag != "GPSInfo":
rows[tag] = str(val)
gps_ifd = exif.get_ifd(0x8825)
if gps_ifd:
rows["GPS"] = _parse_gps(gps_ifd)
if rows:
meta["exif"] = rows
return meta
except Exception:
return {}
def _parse_gps(gps_ifd: dict):
try:
from PIL import ExifTags
def _dms(coords):
d, m, s = coords
return float(d) + float(m) / 60 + float(s) / 3600
lat = _dms(gps_ifd.get(2, (0, 0, 0)))
lon = _dms(gps_ifd.get(4, (0, 0, 0)))
latR = gps_ifd.get(1, "N")
lonR = gps_ifd.get(3, "E")
if latR == "S": lat = -lat
if lonR == "W": lon = -lon
return f"{lat:.6f}, {lon:.6f}"
except Exception:
return None
def _backup_path(p: Path) -> Path:
ts = datetime.now().strftime("%Y%m%d%H%M%S")
return p.parent / f"{p.stem}_bak_{ts}{p.suffix}"
def _auto_rename(p: Path) -> Path:
i = 1
while True:
candidate = p.parent / f"{p.stem}_{i}{p.suffix}"
if not candidate.exists():
return candidate
i += 1
def _trash_move(target: Path, subpath: str) -> None:
rel = Path(subpath)
trash_dest = TRASH_ROOT / rel
trash_dest.parent.mkdir(parents=True, exist_ok=True)
if trash_dest.exists():
ts = datetime.now().strftime("%Y%m%d%H%M%S")
trash_dest = trash_dest.parent / f"{trash_dest.stem}_{ts}{trash_dest.suffix}"
shutil.move(str(target), trash_dest)
Path(str(trash_dest) + ".trashinfo").write_text(json.dumps({
"original_path": str(rel),
"deleted_at": datetime.now().isoformat(timespec="seconds"),
}))
def _trash_restore(trash_rel: str, conflict: str = "") -> tuple:
target = (TRASH_ROOT / trash_rel).resolve()
if not str(target).startswith(str(TRASH_ROOT)):
return False, "forbidden"
if not target.is_file():
return False, "not_found"
info_path = Path(str(target) + ".trashinfo")
try:
original = json.loads(info_path.read_text())["original_path"]
except Exception:
original = trash_rel
dest = (ASSETS_ROOT / original).resolve()
if not str(dest).startswith(str(ASSETS_ROOT)):
return False, "forbidden"
if dest.exists():
if not conflict:
return False, "conflict"
if conflict == "rename":
dest = _auto_rename(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(target), str(dest))
if info_path.exists():
info_path.unlink()
return True, str(dest.relative_to(ASSETS_ROOT))
def _trash_purge(days: int = 30) -> int:
if not TRASH_ROOT.exists():
return 0
cutoff = datetime.now() - timedelta(days=days)
count = 0
for info_file in list(TRASH_ROOT.rglob("*.trashinfo")):
try:
deleted_at = datetime.fromisoformat(
json.loads(info_file.read_text())["deleted_at"]
)
if deleted_at < cutoff:
f = Path(str(info_file)[:-len(".trashinfo")])
if f.exists():
f.unlink()
info_file.unlink()
count += 1
except Exception:
pass
for d in sorted(TRASH_ROOT.rglob("*"), reverse=True):
if d.is_dir() and d != TRASH_ROOT:
try:
d.rmdir()
except OSError:
pass
return count
def _trash_list() -> list:
if not TRASH_ROOT.exists():
return []
entries = []
for info_file in TRASH_ROOT.rglob("*.trashinfo"):
try:
data = json.loads(info_file.read_text())
fpath = Path(str(info_file)[:-len(".trashinfo")])
if not fpath.exists():
info_file.unlink()
continue
rel = fpath.relative_to(TRASH_ROOT)
ext = fpath.suffix.lower()
stat = fpath.stat()
entries.append({
"name": fpath.name,
"path": str(rel),
"original_path": data.get("original_path", str(rel)),
"deleted_at": datetime.fromisoformat(data["deleted_at"]),
"size": stat.st_size,
"is_dir": False,
"is_image": ext in IMAGE_EXT,
"is_text": ext in TEXT_EXT,
"is_pdf": ext in PDF_EXT,
"ext": ext,
})
except Exception:
pass
entries.sort(key=lambda e: e["deleted_at"], reverse=True)
return entries
def _trash_count() -> int:
if not TRASH_ROOT.exists():
return 0
return sum(1 for f in TRASH_ROOT.rglob("*")
if f.is_file() and not f.name.endswith(".trashinfo"))
def _trash_stats() -> dict:
if not TRASH_ROOT.exists():
return {"files": 0, "size": 0, "oldest": None}
files, size, oldest = 0, 0, None
for f in TRASH_ROOT.rglob("*"):
if not f.is_file() or f.name.endswith(".trashinfo"):
continue
files += 1
size += f.stat().st_size
info = Path(str(f) + ".trashinfo")
try:
dt = datetime.fromisoformat(json.loads(info.read_text())["deleted_at"])
if oldest is None or dt < oldest:
oldest = dt
except Exception:
pass
return {"files": files, "size": size, "oldest": oldest}
def _folder_stats(path: Path) -> dict:
files, size = 0, 0
for f in path.rglob("*"):
if not f.is_file():
continue
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
continue
files += 1
size += f.stat().st_size
return {"files": files, "size": size}
def _parse_date(s: str):
if not s:
return None
try:
return datetime.strptime(s, "%Y-%m-%d")
except ValueError:
return None
def _load_hits() -> dict:
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
if not STATS_JSON.is_file():
return {}
try:
data = json.loads(STATS_JSON.read_text())
hits = {}
for item in data.get("requests", {}).get("data", []):
url = item.get("data", "").lstrip("/")
count = item.get("hits", {}).get("count", 0)
if url and count:
hits[url] = count
return hits
except Exception:
return {}
def _entry(item: Path) -> dict:
stat = item.stat()
ext = item.suffix.lower()
return {
"name": item.name,
"path": str(item.relative_to(ASSETS_ROOT)),
"is_dir": item.is_dir(),
"size": stat.st_size if item.is_file() else None,
"mtime": datetime.fromtimestamp(stat.st_mtime),
"ext": ext,
"is_image": ext in IMAGE_EXT,
"is_text": ext in TEXT_EXT,
"is_pdf": ext in PDF_EXT,
}
# ── Context processor ────────────────────────────────────────────────
@app.context_processor
def _inject_globals():
u = _user()
return {
"user": u,
"trash_count": _trash_count() if u else 0,
"humansize": _humansize,
"app_version": _APP_VERSION,
}
# ── Auth ──────────────────────────────────────────────────────────────
@app.route("/auth/login")
def login():
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
@app.route("/auth/callback")
def callback():
token = oauth.alpid.authorize_access_token()
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
email = info.get("email", "")
groups = set(info.get("groups", []))
if groups:
is_admin = bool(groups & ADMIN_GROUPS)
elif ADMIN_EMAILS:
is_admin = email in ADMIN_EMAILS
else:
is_admin = True
session["user"] = {
"sub": info["sub"],
"name": info.get("name") or info.get("preferred_username", ""),
"email": email,
"is_admin": is_admin,
}
session["id_token"] = token.get("id_token", "")
return redirect(session.pop("next_url", url_for("dashboard")))
@app.route("/auth/logout")
def logout():
id_token = session.get("id_token")
session.clear()
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
if id_token:
params["id_token_hint"] = id_token
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
# ── Dashboard ─────────────────────────────────────────────────────────
@app.route("/")
def dashboard():
redir = _require_admin()
if redir:
return redir
folders = {}
for item in sorted(ASSETS_ROOT.iterdir()):
if item.name in _HIDDEN or item.name.startswith("."):
continue
if item.is_dir():
folders[item.name] = _folder_stats(item)
return render_template("dashboard.html",
folders=folders,
total_files=sum(v["files"] for v in folders.values()),
total_size=sum(v["size"] for v in folders.values()),
trash=_trash_stats(),
)
@app.route("/changelog")
def changelog():
redir = _require_admin()
if redir:
return redir
sections = _parse_changelog()
return render_template("changelog.html", sections=sections)
def _parse_changelog():
"""Parse CHANGELOG.md into a list of version dicts."""
try:
text = _CHANGELOG_FILE.read_text()
except Exception:
return []
sections = []
current = None
current_group = None
for line in text.splitlines():
if line.startswith("## "):
if current:
if current_group:
current["groups"].append(current_group)
sections.append(current)
m = re.match(r"## \[(.+?)\] — (.+)", line)
current = {"version": m.group(1) if m else line[3:],
"date": m.group(2) if m else "",
"groups": []}
current_group = None
elif line.startswith("### ") and current is not None:
if current_group:
current["groups"].append(current_group)
current_group = {"title": line[4:], "entries": []}
elif line.startswith("- ") and current_group is not None:
current_group["entries"].append(line[2:])
if current:
if current_group:
current["groups"].append(current_group)
sections.append(current)
return sections
# ── Erreurs 404 — helpers ─────────────────────────────────────────────
def _load_ignored_ips() -> set:
try:
return set(json.loads(_IGNORED_IPS_FILE.read_text()).get("ips", []))
except Exception:
return set()
def _save_ignored_ips(ips: set) -> None:
_IGNORED_IPS_FILE.write_text(json.dumps({"ips": sorted(ips)}, indent=2))
def _log_files_404(days: int = 7) -> list:
if not STATS_LOG_FILE:
return []
log_dir = Path(STATS_LOG_FILE).parent
files = []
for f in sorted(log_dir.glob("*-access.log*"), reverse=True)[:days]:
files.append((f, f.name.endswith(".gz")))
return files
def _parse_404s(days: int = 7) -> dict:
global _404_CACHE, _404_CACHE_TS
now = time.time()
ign_mtime = _IGNORED_IPS_FILE.stat().st_mtime if _IGNORED_IPS_FILE.exists() else 0
if now - _404_CACHE_TS < _404_CACHE_TTL and _404_CACHE and ign_mtime <= _404_CACHE_TS:
return _404_CACHE
ignored = _load_ignored_ips()
results: dict = {}
for fpath, is_gz in _log_files_404(days):
try:
opener = gzip.open(fpath, "rt", errors="replace") if is_gz \
else open(fpath, errors="replace")
with opener as f:
for line in f:
m = _LOG_RE.match(line)
if not m:
continue
ip, dt_str, path, status, referer = m.groups()
if status != "404":
continue
if ip in ignored:
continue
path = path.split("?")[0]
try:
dt = datetime.strptime(dt_str, "%d/%b/%Y:%H:%M:%S %z")
except ValueError:
continue
if path not in results:
results[path] = {"count": 0, "last_seen": dt, "ips": {}}
r = results[path]
r["count"] += 1
if dt > r["last_seen"]:
r["last_seen"] = dt
if ip not in r["ips"]:
r["ips"][ip] = []
r["ips"][ip].append({"dt": dt, "referer": referer if referer != "-" else ""})
except Exception:
continue
_404_CACHE = results
_404_CACHE_TS = now
return results
def _is_still_404(path: str) -> bool:
clean = path.lstrip("/")
return not (ASSETS_ROOT / clean).exists()
def _invalidate_404_cache():
global _404_CACHE_TS
_404_CACHE_TS = 0
# ── Bannissement fail2ban — helpers ───────────────────────────────────
def _get_banned_ips() -> tuple:
"""Returns (set_of_ips, list_of_networks) from fail2ban global-blacklist."""
global _BANNED_CACHE, _BANNED_CACHE_TS
now = time.time()
if now - _BANNED_CACHE_TS < _BANNED_CACHE_TTL:
return _BANNED_CACHE
try:
r = subprocess.run(
["/usr/bin/sudo", "/usr/bin/fail2ban-client", "status", "global-blacklist"],
capture_output=True, text=True, timeout=5
)
raw: set = set()
for line in r.stdout.splitlines():
if "Banned IP list:" in line:
raw = set(line.split("Banned IP list:")[1].split())
break
ips, nets = set(), []
for entry in raw:
if "/" in entry:
try:
nets.append(ipaddress.ip_network(entry, strict=False))
except ValueError:
pass
else:
ips.add(entry)
_BANNED_CACHE = (ips, nets)
except Exception:
_BANNED_CACHE = (set(), [])
_BANNED_CACHE_TS = now
return _BANNED_CACHE
def _ip_is_banned(ip: str) -> bool:
banned_ips, banned_nets = _get_banned_ips()
if ip in banned_ips:
return True
try:
addr = ipaddress.ip_address(ip)
return any(addr in net for net in banned_nets)
except ValueError:
return False
def _pg_init_pool() -> bool:
global _pg_pool_obj
if _pg_pool_obj is not None:
return True
if not _HAS_PG or not _pg_dsn:
return False
try:
_pg_pool_obj = _pg_pool_lib.ThreadedConnectionPool(1, 3, _pg_dsn)
return True
except Exception:
return False
def _pg_ensure_schema(conn) -> None:
global _pg_schema_ok
if _pg_schema_ok:
return
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS ip_asn_cache (
ip TEXT PRIMARY KEY,
asn TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
country TEXT NOT NULL DEFAULT '',
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
conn.commit()
_pg_schema_ok = True
@contextmanager
def _pg():
"""Yields a psycopg2 connection from the pool, or None if unavailable."""
if not _pg_init_pool():
yield None
return
conn = None
try:
conn = _pg_pool_obj.getconn()
_pg_ensure_schema(conn)
except Exception:
if conn:
try: _pg_pool_obj.putconn(conn)
except: pass
yield None
return
try:
yield conn
finally:
try: _pg_pool_obj.putconn(conn)
except: pass
def _parse_as_field(as_field: str, country: str = "") -> dict:
"""Parse 'AS12345 Name' string into {asn, name, country}."""
asn, name = "", as_field
if as_field.startswith("AS"):
parts = as_field.split(" ", 1)
asn = parts[0][2:]
name = parts[1] if len(parts) > 1 else ""
return {"asn": asn, "name": name, "country": country}
def _lookup_ip_asn(ip: str) -> dict:
"""Returns {asn, name, country} — PostgreSQL cache (30 j, asn non vide), ip-api.com, puis ipinfo.io."""
# Cache : on ignore les entrées avec asn vide pour forcer un re-essai
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute(
"SELECT asn, name, country FROM ip_asn_cache "
"WHERE ip = %s AND asn != '' AND fetched_at > now() - interval '30 days'",
(ip,)
)
row = cur.fetchone()
if row:
return {"asn": row[0], "name": row[1], "country": row[2]}
except Exception:
try: conn.rollback()
except: pass
result: dict = {"asn": "", "name": "", "country": ""}
# Première tentative : ip-api.com
try:
req = urllib.request.Request(
f"http://ip-api.com/json/{ip}?fields=as,countryCode",
headers={"User-Agent": "alpinux-static/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
d = json.loads(resp.read())
as_field = d.get("as", "")
if as_field:
result = _parse_as_field(as_field, d.get("countryCode", ""))
except Exception:
pass
# Fallback : ipinfo.io si toujours pas d'AS
if not result.get("asn"):
try:
req = urllib.request.Request(
f"https://ipinfo.io/{ip}/json",
headers={"User-Agent": "alpinux-static/1.0", "Accept": "application/json"})
with urllib.request.urlopen(req, timeout=5) as resp:
d = json.loads(resp.read())
org = d.get("org", "")
if org:
result = _parse_as_field(org, d.get("country", ""))
except Exception:
pass
# Mise en cache seulement si on a trouvé un AS
if result.get("asn"):
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ip_asn_cache (ip, asn, name, country)
VALUES (%s, %s, %s, %s)
ON CONFLICT (ip) DO UPDATE SET
asn = EXCLUDED.asn, name = EXCLUDED.name,
country = EXCLUDED.country, fetched_at = now()
""", (ip, result["asn"], result["name"], result["country"]))
conn.commit()
except Exception:
try: conn.rollback()
except: pass
return result
def _batch_lookup_ip_asn(ips: list) -> dict:
"""Single SQL query for all IPs in cache. Falls back to individual lookup for misses (capped at 20)."""
if not ips:
return {}
unique = list(dict.fromkeys(ips))
result: dict = {}
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute(
"SELECT ip, asn, name, country FROM ip_asn_cache "
"WHERE ip = ANY(%s) AND asn != '' AND fetched_at > now() - interval '30 days'",
(unique,)
)
for row in cur.fetchall():
result[row[0]] = {"asn": row[1], "name": row[2], "country": row[3]}
except Exception:
try: conn.rollback()
except: pass
misses = [ip for ip in unique if ip not in result]
for ip in misses[:20]:
result[ip] = _lookup_ip_asn(ip)
for ip in misses[20:]:
result.setdefault(ip, {"asn": "", "name": "", "country": ""})
return result
def _lookup_as_prefixes(asn: str) -> list:
"""Returns IPv4 CIDRs for an ASN via RIPE Stat. Cached 30 days."""
_AS_CACHE_DIR.mkdir(exist_ok=True)
cache_file = _AS_CACHE_DIR / f"AS{asn}.json"
now = time.time()
if cache_file.exists() and now - cache_file.stat().st_mtime < _AS_CACHE_TTL:
return json.loads(cache_file.read_text()).get("prefixes", [])
url = f"https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS{asn}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
prefixes = [
p["prefix"] for p in data.get("data", {}).get("prefixes", [])
if ":" not in p.get("prefix", "")
]
cache_file.write_text(json.dumps({"asn": asn, "prefixes": prefixes}))
return prefixes
except Exception:
return []
# ── Navigateur de fichiers ────────────────────────────────────────────
@app.route("/browse/")
@app.route("/browse/<path:subpath>")
def browse(subpath=""):
redir = _require_admin()
if redir:
return redir
target = _safe_path(subpath)
if not target.exists():
abort(404)
if subpath:
top = Path(subpath).parts[0]
if top in _HIDDEN or top.startswith("."):
abort(403)
if target.is_file():
return _file_preview(target, subpath)
hits = _load_hits()
entries = []
for item in sorted(target.iterdir()):
if item.name in _HIDDEN or item.name.startswith("."):
continue
e = _entry(item)
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
entries.append(e)
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
parts = Path(subpath).parts if subpath else ()
breadcrumb = [
{"name": p, "path": str(Path(*parts[:i + 1]))}
for i, p in enumerate(parts)
]
return render_template("browse.html",
entries=entries,
subpath=subpath,
breadcrumb=breadcrumb,
has_hits=STATS_JSON.is_file(),
)
def _file_preview(path: Path, subpath: str):
ext = path.suffix.lower()
stat = path.stat()
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
siblings = sorted(
[f for f in path.parent.iterdir()
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
key=lambda f: f.name.lower(),
)
idx = next((i for i, f in enumerate(siblings) if f == path), None)
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
ctx = dict(
subpath=subpath,
filename=path.name,
filesize=_humansize(stat.st_size),
mtime=datetime.fromtimestamp(stat.st_mtime),
raw_url=url_for("raw_file", subpath=subpath),
parent_path=parent,
prev_path=prev_path,
next_path=next_path,
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
ext=ext,
)
if ext in IMAGE_EXT:
ctx["meta"] = _image_meta(path)
return render_template("preview_image.html", **ctx)
if ext in TEXT_EXT:
content = path.read_text(errors="replace")
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
return render_template("preview_other.html", **ctx)
# ── Recherche ─────────────────────────────────────────────────────────
@app.route("/search")
def search():
redir = _require_admin()
if redir:
return redir
q = request.args.get("q", "").strip()
after_s = request.args.get("after", "").strip()
before_s = request.args.get("before", "").strip()
in_content = request.args.get("content") == "1"
dt_after = _parse_date(after_s)
dt_before = _parse_date(before_s)
results = []
searched = bool(q or dt_after or dt_before)
if searched:
q_low = q.lower()
for path in sorted(ASSETS_ROOT.rglob("*")):
if not path.is_file():
continue
parts = path.relative_to(ASSETS_ROOT).parts
if any(p in _HIDDEN or p.startswith(".") for p in parts):
continue
stat = path.stat()
mtime = datetime.fromtimestamp(stat.st_mtime)
if dt_after and mtime.date() < dt_after.date():
continue
if dt_before and mtime.date() > dt_before.date():
continue
name_match = (not q) or q_low in path.name.lower()
content_match = None
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
try:
text = path.read_text(errors="replace")
idx = text.lower().find(q_low)
if idx != -1:
start = max(0, idx - 60)
end = min(len(text), idx + len(q) + 60)
snippet = text[start:end].replace("\n", " ")
content_match = ("" if start else "") + snippet + ("" if end < len(text) else "")
name_match = True
except Exception:
pass
if not name_match:
continue
e = _entry(path)
e["content_match"] = content_match
results.append(e)
return render_template("search.html",
q=q,
after=after_s,
before=before_s,
in_content=in_content,
results=results,
searched=searched,
)
# ── Statistiques GoAccess ─────────────────────────────────────────────
@app.route("/stats/")
def stats():
redir = _require_admin()
if redir:
return redir
return render_template("stats.html",
has_report=STATS_FILE.is_file(),
)
@app.route("/stats/report")
def stats_report():
redir = _require_admin()
if redir:
return redir
if not STATS_FILE.is_file():
abort(404)
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
def _do_generate():
try:
if STATS_GENERATE_CMD:
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
else:
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
f"--output={STATS_FILE}"]
if STATS_JSON:
cmd.append(f"--output={STATS_JSON}")
subprocess.run(cmd, timeout=600, check=False)
finally:
with _gen_lock:
_gen_state["generating"] = False
@app.route("/stats/generate", methods=["POST"])
def stats_generate():
redir = _require_admin()
if redir:
return redir
if STATS_FILE.is_file():
return jsonify({"status": "exists"})
with _gen_lock:
if _gen_state["generating"]:
return jsonify({"status": "generating"})
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
return jsonify({"status": "error",
"message": "STATS_LOG_FILE non configuré"}), 500
_gen_state["generating"] = True
threading.Thread(target=_do_generate, daemon=True).start()
return jsonify({"status": "started"})
@app.route("/stats/status")
def stats_status():
redir = _require_admin()
if redir:
return redir
return jsonify({"ready": STATS_FILE.is_file(),
"generating": _gen_state["generating"]})
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
@app.route("/raw/<path:subpath>")
def raw_file(subpath):
redir = _require_admin()
if redir:
return redir
target = _safe_path(subpath)
if not target.is_file():
abort(404)
top = Path(subpath).parts[0]
if top in _HIDDEN or top.startswith("."):
abort(403)
return send_from_directory(ASSETS_ROOT, subpath)
# ── Suppression de fichiers ───────────────────────────────────────────
@app.route("/delete", methods=["POST"])
def delete_file():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
if not subpath:
abort(400)
target = _safe_path(subpath)
if not target.exists():
abort(404)
if target.is_dir():
abort(400)
top = Path(subpath).parts[0]
if top in _HIDDEN or top.startswith("."):
abort(403)
parent = str(Path(subpath).parent)
_trash_move(target, subpath)
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
# ── Erreurs 404 ───────────────────────────────────────────────────────
def _filter_banned_from_entry(info: dict) -> dict | None:
"""Remove banned IPs from an entry dict. Returns None if no IPs remain."""
ips_ok = {ip: hits for ip, hits in info["ips"].items() if not _ip_is_banned(ip)}
if not ips_ok:
return None
count = sum(len(h) for h in ips_ok.values())
last_seen = max(max(h["dt"] for h in hits) for hits in ips_ok.values())
return {"count": count, "last_seen": last_seen, "ips": ips_ok}
@app.route("/errors/")
def errors_404():
redir = _require_admin()
if redir:
return redir
# ── Onglet Erreurs 404 ──
data = _parse_404s()
filtered = {}
for path, info in data.items():
entry = _filter_banned_from_entry(info)
if entry:
filtered[path] = entry
entries = sorted(filtered.items(), key=lambda x: x[1]["count"], reverse=True)
ignored = _load_ignored_ips()
# Count only — ASN lookup is deferred to /errors/banned-groups (AJAX)
banned_ips, banned_nets = _get_banned_ips()
banned_total = len(banned_ips) + len(banned_nets)
return render_template("errors_404.html",
entries=entries,
ignored_ips=sorted(ignored),
total=sum(v["count"] for v in filtered.values()),
log_configured=bool(STATS_LOG_FILE),
banned_total=banned_total,
)
@app.route("/errors/detail")
def errors_detail():
redir = _require_admin()
if redir:
return redir
path = request.args.get("path", "")
data = _parse_404s()
raw_entry = data.get(path)
if not raw_entry:
return jsonify({"error": "introuvable"}), 404
entry = _filter_banned_from_entry(raw_entry) or {"count": 0, "last_seen": raw_entry["last_seen"], "ips": {}}
ignored = _load_ignored_ips()
ip_list = []
for ip, hits in sorted(entry["ips"].items(), key=lambda x: len(x[1]), reverse=True):
last = max(h["dt"] for h in hits)
ip_list.append({
"ip": ip,
"count": len(hits),
"last_seen": last.strftime("%d/%m/%Y %H:%M"),
"ignored": ip in ignored,
"hits": [
{"dt": h["dt"].strftime("%d/%m/%Y %H:%M"), "referer": h["referer"]}
for h in sorted(hits, key=lambda x: x["dt"], reverse=True)[:30]
],
})
return jsonify({
"path": path,
"count": entry["count"],
"still_404": _is_still_404(path),
"last_seen": entry["last_seen"].strftime("%d/%m/%Y %H:%M"),
"ips": ip_list,
})
@app.route("/errors/asinfo")
def errors_asinfo():
redir = _require_admin()
if redir:
return redir
ip = request.args.get("ip", "").strip()
if not re.match(r"^[\d\.]+$", ip):
return jsonify({"error": "IP invalide"}), 400
info = _lookup_ip_asn(ip)
if info.get("asn"):
info["prefix_count"] = len(_lookup_as_prefixes(info["asn"]))
else:
info["prefix_count"] = 0
return jsonify(info)
@app.route("/errors/ignore", methods=["POST"])
def errors_ignore():
redir = _require_admin()
if redir:
return redir
ip = request.form.get("ip", "").strip()
action = request.form.get("action", "add")
if not re.match(r"^[\d\.a-fA-F:]+$", ip):
return jsonify({"error": "IP invalide"}), 400
ips = _load_ignored_ips()
ips.add(ip) if action == "add" else ips.discard(ip)
_save_ignored_ips(ips)
_invalidate_404_cache()
return jsonify({"ok": True, "action": action, "ip": ip})
@app.route("/errors/ban", methods=["POST"])
def errors_ban():
global _BANNED_CACHE_TS
redir = _require_admin()
if redir:
return redir
ip = request.form.get("ip", "").strip()
jail = request.form.get("jail", "global-blacklist")
ban_as = request.form.get("ban_as") == "1"
if not re.match(r"^[\d\.a-fA-F:]+$", ip):
return jsonify({"error": "IP invalide"}), 400
try:
if ban_as:
info = _lookup_ip_asn(ip)
if not info.get("asn"):
return jsonify({"error": "ASN introuvable pour cette IP"}), 400
targets = _lookup_as_prefixes(info["asn"])
if not targets:
return jsonify({"error": "Aucun préfixe IPv4 trouvé pour cet AS"}), 400
else:
targets = [ip]
r = subprocess.run(
["/usr/bin/sudo", "/usr/bin/fail2ban-client", "set", jail, "banip"] + targets,
capture_output=True, text=True, timeout=60
)
if r.returncode == 0:
_BANNED_CACHE_TS = 0 # invalide le cache pour ce worker
return jsonify({"ok": True, "ip": ip, "jail": jail, "count": len(targets)})
return jsonify({"error": r.stderr.strip() or "Erreur fail2ban"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/errors/banned/")
def errors_banned():
return redirect(url_for('errors_404') + '#banned')
@app.route("/errors/banned-groups")
def errors_banned_groups():
redir = _require_admin()
if redir:
return jsonify({"error": "not authorized"}), 403
banned_ips, banned_nets = _get_banned_ips()
all_banned = sorted(banned_ips) + sorted(str(n) for n in banned_nets)
# ── Étape 1 : reverse-index depuis les fichiers RIPE Stat locaux ──
# Les CIDRs bannis via "Ban AS" sont tous dans as_cache/AS*.json → zéro appel API
prefix_to_asn: dict = {}
if _AS_CACHE_DIR.exists():
for cf in _AS_CACHE_DIR.glob("AS*.json"):
try:
d = json.loads(cf.read_text())
asn = str(d.get("asn", cf.stem[2:]))
for p in d.get("prefixes", []):
prefix_to_asn[p] = asn
except Exception:
pass
# ── Étape 2 : classer chaque entrée : connue (prefix cache) ou à chercher ──
entry_asn: dict = {} # entry → asn string
known_asns: set = set()
needs_api: list = [] # entries non trouvées dans prefix cache
for entry in all_banned:
if entry in prefix_to_asn:
asn = prefix_to_asn[entry]
entry_asn[entry] = asn
known_asns.add(asn)
else:
needs_api.append(entry)
# ── Étape 3 : nom/pays des AS connus — 1 seule requête PostgreSQL ──
asn_details: dict = {} # asn → {name, country}
if known_asns:
with _pg() as conn:
if conn:
try:
with conn.cursor() as cur:
cur.execute(
"SELECT DISTINCT ON (asn) asn, name, country FROM ip_asn_cache "
"WHERE asn = ANY(%s) ORDER BY asn, fetched_at DESC",
(list(known_asns),)
)
for row in cur.fetchall():
asn_details[row[0]] = {"name": row[1], "country": row[2]}
except Exception:
try: conn.rollback()
except: pass
# ── Étape 4 : lookup API pour les IPs/CIDRs bannis individuellement ──
if needs_api:
rep_ips = [e.split("/")[0] for e in needs_api]
ip_map = _batch_lookup_ip_asn(rep_ips)
for entry, rep_ip in zip(needs_api, rep_ips):
info = ip_map.get(rep_ip, {})
entry_asn[entry] = info.get("asn") or "?"
if info.get("asn"):
known_asns.add(info["asn"])
asn_details.setdefault(info["asn"], {"name": info.get("name", ""), "country": info.get("country", "")})
# ── Étape 5 : construire les groupes ──
by_asn: dict = {}
for entry in all_banned:
asn = entry_asn.get(entry, "?")
det = asn_details.get(asn, {})
if asn not in by_asn:
by_asn[asn] = {"asn": asn if asn != "?" else "", "name": det.get("name", ""),
"country": det.get("country", ""), "entries": []}
by_asn[asn]["entries"].append(entry)
groups = sorted(by_asn.values(), key=lambda g: len(g["entries"]), reverse=True)
return jsonify({"groups": groups, "total": len(all_banned)})
@app.route("/errors/unban", methods=["POST"])
def errors_unban():
global _BANNED_CACHE_TS
redir = _require_admin()
if redir:
return redir
entries = request.form.getlist("entries") or [request.form.get("entry", "").strip()]
entries = [e.strip() for e in entries if e.strip()]
if not entries:
return jsonify({"error": "Aucune cible spécifiée"}), 400
for e in entries:
if not re.match(r"^[\d\.a-fA-F:/]+$", e):
return jsonify({"error": f"Entrée invalide : {e}"}), 400
jail = "global-blacklist"
unbanned, errors = [], []
for entry in entries:
try:
r = subprocess.run(
["/usr/bin/sudo", "/usr/bin/fail2ban-client", "set", jail, "unbanip", entry],
capture_output=True, text=True, timeout=10
)
(unbanned if r.returncode == 0 else errors).append(entry)
except Exception as ex:
errors.append(f"{entry}: {ex}")
_BANNED_CACHE_TS = 0
if errors and not unbanned:
return jsonify({"error": f"Erreur sur : {', '.join(errors)}"}), 500
return jsonify({"ok": True, "unbanned": unbanned, "errors": errors, "count": len(unbanned)})
# ── Corbeille ────────────────────────────────────────────────────────
@app.route("/trash")
def trash_list():
redir = _require_admin()
if redir:
return redir
_trash_purge(30)
entries = _trash_list()
return render_template("trash.html", entries=entries)
@app.route("/trash/raw/<path:subpath>")
def trash_raw(subpath):
redir = _require_admin()
if redir:
return redir
target = (TRASH_ROOT / subpath).resolve()
if not str(target).startswith(str(TRASH_ROOT)):
abort(400)
if not target.is_file():
abort(404)
return send_from_directory(TRASH_ROOT, subpath)
@app.route("/trash/delete", methods=["POST"])
def trash_delete():
redir = _require_admin()
if redir:
return redir
trash_rel = request.form.get("path", "").strip()
if not trash_rel:
abort(400)
target = (TRASH_ROOT / trash_rel).resolve()
if not str(target).startswith(str(TRASH_ROOT)):
abort(400)
if target.is_file():
target.unlink()
info = Path(str(target) + ".trashinfo")
if info.exists():
info.unlink()
return redirect(url_for("trash_list"))
@app.route("/trash/restore", methods=["POST"])
def trash_restore():
redir = _require_admin()
if redir:
return redir
trash_rel = request.form.get("path", "").strip()
conflict = request.form.get("conflict", "").strip()
if not trash_rel:
abort(400)
ok, result = _trash_restore(trash_rel, conflict)
if not ok:
if result == "conflict":
return jsonify({"conflict": True, "path": trash_rel}), 409
abort(400)
parent = str(Path(result).parent)
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
@app.route("/trash/preview/<path:subpath>")
def trash_preview(subpath):
redir = _require_admin()
if redir:
return redir
target = (TRASH_ROOT / subpath).resolve()
if not str(target).startswith(str(TRASH_ROOT)):
abort(400)
if not target.is_file():
abort(404)
stat = target.stat()
ext = target.suffix.lower()
ctx = dict(
subpath = subpath,
filename = target.name,
filesize = _humansize(stat.st_size),
mtime = datetime.fromtimestamp(stat.st_mtime),
raw_url = url_for("trash_raw", subpath=subpath),
parent_path= None,
prev_path = None,
next_path = None,
sibling_pos= "",
ext = ext,
from_trash = True,
)
if ext in IMAGE_EXT:
ctx["meta"] = _image_meta(target)
return render_template("preview_image.html", **ctx)
if ext in TEXT_EXT:
return render_template("preview_text.html",
content=target.read_text(errors="replace"),
lang=ext.lstrip("."), **ctx)
return render_template("preview_other.html", **ctx)
@app.route("/trash/empty", methods=["POST"])
def trash_empty():
redir = _require_admin()
if redir:
return redir
if TRASH_ROOT.exists():
shutil.rmtree(TRASH_ROOT)
TRASH_ROOT.mkdir(exist_ok=True)
return redirect(url_for("trash_list"))
# ── Renommage de fichiers ─────────────────────────────────────────────
@app.route("/rename", methods=["POST"])
def rename_file():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
new_name = request.form.get("new_name", "").strip()
if not subpath or not new_name:
return jsonify({"error": "Paramètres manquants"}), 400
if "/" in new_name or "\\" in new_name or new_name in (".", ".."):
return jsonify({"error": "Nom invalide"}), 400
new_name = secure_filename(new_name)
if not new_name:
return jsonify({"error": "Nom invalide après nettoyage"}), 400
target = _safe_path(subpath)
if not target.is_file():
return jsonify({"error": "Fichier introuvable"}), 404
top = Path(subpath).parts[0]
if top in _HIDDEN or top.startswith("."):
return jsonify({"error": "Accès refusé"}), 403
dest = target.parent / new_name
if not dest.is_relative_to(ASSETS_ROOT):
return jsonify({"error": "Destination invalide"}), 400
if dest.exists():
return jsonify({"error": f"« {new_name} » existe déjà"}), 409
target.rename(dest)
new_path = str(dest.relative_to(ASSETS_ROOT))
return jsonify({
"name": new_name,
"path": new_path,
"browse_url": url_for("browse", subpath=new_path),
})
# ── Vérification des conflits avant upload ───────────────────────────
@app.route("/check-upload", methods=["POST"])
def check_upload():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
names = request.form.getlist("names")
if subpath:
parts = Path(subpath).parts
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
abort(403)
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
if not dest.is_dir():
abort(400)
conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()]
return jsonify({"conflicts": conflicts})
# ── Upload de fichiers ────────────────────────────────────────────────
@app.route("/upload", methods=["POST"])
def upload_file():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
files = request.files.getlist("files")
if not files or all(f.filename == "" for f in files):
abort(400)
if subpath:
parts = Path(subpath).parts
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
abort(403)
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
if not dest.is_dir():
abort(400)
conflict = request.form.get("conflict", "overwrite")
if conflict not in ("backup", "overwrite", "rename", "skip"):
conflict = "overwrite"
for f in files:
name = secure_filename(f.filename or "")
if not name:
continue
out_path = dest / name
if out_path.exists():
if conflict == "skip":
continue
elif conflict == "backup":
try:
out_path.rename(_backup_path(out_path))
except Exception:
continue
elif conflict == "rename":
out_path = _auto_rename(out_path)
f.save(out_path)
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
# ── Vérification des conflits avant redimensionnement ────────────────
@app.route("/check-resize", methods=["POST"])
def check_resize():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
raw_sizes = request.form.getlist("sizes")
raw_formats = request.form.getlist("formats")
if not subpath:
return jsonify({"conflicts": []})
target = _safe_path(subpath)
if not target.is_file():
return jsonify({"conflicts": []})
try:
sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES]
except (ValueError, TypeError):
sizes = []
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
src_ext = target.suffix.lstrip(".")
stem = re.sub(r"_\d+x\d+$", "", target.stem)
parent = target.parent
all_dims = [(s, s) for s in sizes]
for cs in request.form.getlist("custom_sizes"):
try:
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
if w >= 1 and h >= 1:
all_dims.append((w, h))
except (ValueError, TypeError):
pass
# Pas de dimensions sélectionnées → conserver celles d'origine
if not all_dims:
try:
src = Image.open(target)
all_dims = [(src.width, src.height)]
src.close()
except Exception:
return jsonify({"conflicts": []})
# Pas de format sélectionné → conserver le format d'origine
if not formats:
formats = [src_ext]
conflicts = []
for fmt in formats:
for w, h in all_dims:
out_name = f"{stem}_{w}x{h}.{fmt}"
if (parent / out_name).exists():
conflicts.append(out_name)
return jsonify({"conflicts": conflicts})
# ── Redimensionnement d'images ────────────────────────────────────────
@app.route("/resize", methods=["POST"])
def resize_image():
redir = _require_admin()
if redir:
return redir
subpath = request.form.get("path", "").strip()
raw_sizes = request.form.getlist("sizes")
raw_formats = request.form.getlist("formats")
raw_customs = request.form.getlist("custom_sizes")
if not subpath or not raw_formats:
return jsonify({"error": "Paramètres manquants"}), 400
target = _safe_path(subpath)
if not target.is_file():
abort(404)
ext = target.suffix.lower()
if ext not in IMAGE_EXT:
abort(400)
top = Path(subpath).parts[0]
if top in _HIDDEN or top.startswith("."):
abort(403)
try:
square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})]
except (ValueError, TypeError):
square_dims = []
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
# Dimensions libres — validées contre la résolution source
src_img = Image.open(target)
max_w, max_h = src_img.width, src_img.height
src_img.close()
custom_dims = []
for cs in raw_customs:
try:
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
if w < 1 or h < 1:
continue
if w > max_w or h > max_h:
continue
custom_dims.append((w, h))
except (ValueError, TypeError):
pass
all_dims = square_dims + custom_dims
# Pas de dimensions sélectionnées → conserver celles d'origine
if not all_dims:
all_dims = [(max_w, max_h)]
# Pas de format sélectionné → conserver le format d'origine
if not formats:
formats = [ext.lstrip(".")]
conflict = request.form.get("conflict", "skip")
if conflict not in ("backup", "overwrite", "rename", "skip"):
conflict = "skip"
is_svg = ext == ".svg"
stem = re.sub(r"_\d+x\d+$", "", target.stem)
parent = target.parent
created, errors = [], []
# Signaler les dimensions hors-bornes
for cs in raw_customs:
try:
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
if w > max_w or h > max_h:
for f in formats:
errors.append({"name": f"{stem}_{w}x{h}.{f}",
"reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"})
except (ValueError, TypeError):
pass
for fmt in formats:
for w, h in all_dims:
out_name = f"{stem}_{w}x{h}.{fmt}"
out_path = parent / out_name
out_rel = str(out_path.relative_to(ASSETS_ROOT))
if out_path.exists():
if conflict == "skip":
errors.append({"name": out_name, "reason": "Fichier existant, ignoré"})
continue
elif conflict == "backup":
try:
bak = _backup_path(out_path)
out_path.rename(bak)
except Exception as exc:
errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"})
continue
elif conflict == "rename":
out_path = _auto_rename(out_path)
out_name = out_path.name
out_rel = str(out_path.relative_to(ASSETS_ROOT))
try:
if fmt == "svg" and is_svg:
shutil.copy2(target, out_path)
created.append({"name": out_name, "path": out_rel})
elif fmt == "svg" and not is_svg:
errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"})
elif is_svg:
if not HAS_CAIROSVG:
errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"})
continue
if fmt in ("png", "ico"):
buf = io.BytesIO()
_cairosvg.svg2png(url=str(target), write_to=buf,
output_width=w, output_height=h)
buf.seek(0)
img = Image.open(buf).convert("RGBA")
if fmt == "ico":
img.save(out_path, format="ICO", sizes=[(w, h)])
else:
img.save(out_path, format="PNG")
else:
buf = io.BytesIO()
_cairosvg.svg2png(url=str(target), write_to=buf,
output_width=w, output_height=h)
buf.seek(0)
img = Image.open(buf).convert("RGB")
img.save(out_path, format="JPEG", quality=90)
created.append({"name": out_name, "path": out_rel})
else:
img = Image.open(target)
# LANCZOS ne supporte pas le mode palette (ICO, GIF…)
if img.mode not in ("RGB", "RGBA", "L", "LA", "I", "F"):
img = img.convert("RGBA")
img = img.resize((w, h), Image.LANCZOS)
if fmt == "png":
if img.mode not in ("RGBA", "RGB", "L"):
img = img.convert("RGBA")
img.save(out_path, format="PNG")
elif fmt == "jpg":
img = img.convert("RGB")
img.save(out_path, format="JPEG", quality=90)
elif fmt == "ico":
img = img.convert("RGBA")
img.save(out_path, format="ICO", sizes=[(w, h)])
created.append({"name": out_name, "path": out_rel})
except Exception as exc:
errors.append({"name": out_name, "reason": str(exc)})
return jsonify({"created": created, "errors": errors})
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
@app.route("/favicon.ico")
def favicon():
return send_from_directory(ASSETS_ROOT, "favicon.ico")
@app.route("/robots.txt")
def robots():
return send_from_directory(ASSETS_ROOT, "robots.txt")
@app.route("/<path:subpath>")
def cdn_file(subpath):
top = Path(subpath).parts[0]
if top not in _PUBLIC_TOP:
abort(404)
target = _safe_path(subpath)
if not target.is_file():
abort(404)
return send_from_directory(ASSETS_ROOT, subpath)
if __name__ == "__main__":
app.run(debug=True, port=5003)