- psycopg2-binary ajouté aux dépendances - Cache ip-api.com dans table ip_asn_cache (PostgreSQL, TTL 30 j) - ThreadedConnectionPool partagé par worker, schéma auto-créé au démarrage - Graceful degradation si DATABASE_URL absent - deploy-app.sh : pip tourne sous static-cdn (sudo -u) pour respecter les droits Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1613 lines
52 KiB
Python
1613 lines
52 KiB
Python
import gzip
|
||
import io
|
||
import ipaddress
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
import urllib.request
|
||
from contextlib import contextmanager
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
|
||
from urllib.parse import urlencode
|
||
|
||
try:
|
||
import psycopg2
|
||
import psycopg2.pool as _pg_pool_lib
|
||
_HAS_PG = True
|
||
except ImportError:
|
||
_HAS_PG = False
|
||
from PIL import Image
|
||
|
||
from flask import (Flask, redirect, url_for, session, request,
|
||
render_template, abort, send_from_directory, jsonify)
|
||
from authlib.integrations.flask_client import OAuth
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
from werkzeug.utils import secure_filename
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.environ["SECRET_KEY"]
|
||
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
||
|
||
oauth = OAuth(app)
|
||
oauth.register(
|
||
name="alpid",
|
||
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
|
||
client_id=os.environ["ALPID_CLIENT_ID"],
|
||
client_secret=os.environ["ALPID_CLIENT_SECRET"],
|
||
client_kwargs={"scope": "openid profile email"},
|
||
)
|
||
|
||
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
|
||
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
|
||
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
|
||
TRASH_ROOT = ASSETS_ROOT / ".trash"
|
||
|
||
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
|
||
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
|
||
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
|
||
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
|
||
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
|
||
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
|
||
|
||
_gen_lock = threading.Lock()
|
||
_gen_state = {"generating": False}
|
||
|
||
try:
|
||
_APP_VERSION = (Path(__file__).parent / "VERSION").read_text().strip()
|
||
except Exception:
|
||
_APP_VERSION = "—"
|
||
|
||
_CHANGELOG_FILE = Path(__file__).parent / "CHANGELOG.md"
|
||
_IGNORED_IPS_FILE = Path(__file__).parent / "ignored_ips.json"
|
||
|
||
_LOG_RE = re.compile(
|
||
r'(\S+) \S+ \S+ \[(\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] '
|
||
r'"[A-Z-]+ ([^\s"]+)[^"]*" (\d{3}) \d+ "([^"]*)"'
|
||
)
|
||
_404_CACHE: dict = {}
|
||
_404_CACHE_TS: float = 0
|
||
_404_CACHE_TTL = 300 # 5 min
|
||
|
||
_AS_CACHE_DIR = Path(__file__).parent / "as_cache"
|
||
_AS_CACHE_TTL = 30 * 24 * 3600 # 30 jours
|
||
_BANNED_CACHE: tuple = (set(), [])
|
||
_BANNED_CACHE_TS: float = 0
|
||
_BANNED_CACHE_TTL = 60 # 1 min
|
||
|
||
_pg_dsn = os.environ.get("DATABASE_URL", "")
|
||
_pg_pool_obj = None
|
||
_pg_schema_ok = False
|
||
|
||
|
||
_HIDDEN = frozenset({
|
||
".git", "scripts", "app",
|
||
".env", ".env.example", ".gitignore",
|
||
".htaccess", ".htpasswd_stats",
|
||
"standard_index.html",
|
||
})
|
||
|
||
try:
|
||
import cairosvg as _cairosvg
|
||
HAS_CAIROSVG = True
|
||
except ImportError:
|
||
HAS_CAIROSVG = False
|
||
|
||
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
|
||
|
||
RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024})
|
||
RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"})
|
||
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
|
||
".xml", ".conf", ".sh", ".robots", ".php"})
|
||
PDF_EXT = frozenset({".pdf"})
|
||
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────
|
||
|
||
def _user():
|
||
return session.get("user")
|
||
|
||
|
||
def _require_admin():
|
||
u = _user()
|
||
if not u:
|
||
session["next_url"] = request.url
|
||
return redirect(url_for("login"))
|
||
if not u.get("is_admin"):
|
||
abort(403)
|
||
return None
|
||
|
||
|
||
def _safe_path(subpath: str) -> Path:
|
||
target = (ASSETS_ROOT / subpath).resolve()
|
||
if not target.is_relative_to(ASSETS_ROOT):
|
||
abort(400)
|
||
return target
|
||
|
||
|
||
def _humansize(n: int) -> str:
|
||
for unit in ("o", "Ko", "Mo", "Go"):
|
||
if n < 1024:
|
||
return f"{n:.0f} {unit}"
|
||
n /= 1024
|
||
return f"{n:.1f} To"
|
||
|
||
|
||
_EXIF_WANTED = frozenset({
|
||
"Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized",
|
||
"ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash",
|
||
"WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation",
|
||
"XResolution", "YResolution", "ResolutionUnit", "ColorSpace",
|
||
"PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo",
|
||
})
|
||
|
||
|
||
def _image_meta(path: Path) -> dict:
|
||
try:
|
||
from PIL import ExifTags
|
||
img = Image.open(path)
|
||
meta = {
|
||
"width": img.width,
|
||
"height": img.height,
|
||
"format": img.format or path.suffix.lstrip(".").upper(),
|
||
"mode": img.mode,
|
||
"dpi": img.info.get("dpi"),
|
||
}
|
||
exif = img.getexif()
|
||
if exif:
|
||
rows = {}
|
||
for tag_id, val in exif.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
sub = exif.get_ifd(0x8769)
|
||
for tag_id, val in sub.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
gps_ifd = exif.get_ifd(0x8825)
|
||
if gps_ifd:
|
||
rows["GPS"] = _parse_gps(gps_ifd)
|
||
if rows:
|
||
meta["exif"] = rows
|
||
return meta
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _parse_gps(gps_ifd: dict):
|
||
try:
|
||
from PIL import ExifTags
|
||
def _dms(coords):
|
||
d, m, s = coords
|
||
return float(d) + float(m) / 60 + float(s) / 3600
|
||
|
||
lat = _dms(gps_ifd.get(2, (0, 0, 0)))
|
||
lon = _dms(gps_ifd.get(4, (0, 0, 0)))
|
||
latR = gps_ifd.get(1, "N")
|
||
lonR = gps_ifd.get(3, "E")
|
||
if latR == "S": lat = -lat
|
||
if lonR == "W": lon = -lon
|
||
return f"{lat:.6f}, {lon:.6f}"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _backup_path(p: Path) -> Path:
|
||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
return p.parent / f"{p.stem}_bak_{ts}{p.suffix}"
|
||
|
||
|
||
def _auto_rename(p: Path) -> Path:
|
||
i = 1
|
||
while True:
|
||
candidate = p.parent / f"{p.stem}_{i}{p.suffix}"
|
||
if not candidate.exists():
|
||
return candidate
|
||
i += 1
|
||
|
||
|
||
def _trash_move(target: Path, subpath: str) -> None:
|
||
rel = Path(subpath)
|
||
trash_dest = TRASH_ROOT / rel
|
||
trash_dest.parent.mkdir(parents=True, exist_ok=True)
|
||
if trash_dest.exists():
|
||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
trash_dest = trash_dest.parent / f"{trash_dest.stem}_{ts}{trash_dest.suffix}"
|
||
shutil.move(str(target), trash_dest)
|
||
Path(str(trash_dest) + ".trashinfo").write_text(json.dumps({
|
||
"original_path": str(rel),
|
||
"deleted_at": datetime.now().isoformat(timespec="seconds"),
|
||
}))
|
||
|
||
|
||
def _trash_restore(trash_rel: str, conflict: str = "") -> tuple:
|
||
target = (TRASH_ROOT / trash_rel).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
return False, "forbidden"
|
||
if not target.is_file():
|
||
return False, "not_found"
|
||
info_path = Path(str(target) + ".trashinfo")
|
||
try:
|
||
original = json.loads(info_path.read_text())["original_path"]
|
||
except Exception:
|
||
original = trash_rel
|
||
dest = (ASSETS_ROOT / original).resolve()
|
||
if not str(dest).startswith(str(ASSETS_ROOT)):
|
||
return False, "forbidden"
|
||
if dest.exists():
|
||
if not conflict:
|
||
return False, "conflict"
|
||
if conflict == "rename":
|
||
dest = _auto_rename(dest)
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.move(str(target), str(dest))
|
||
if info_path.exists():
|
||
info_path.unlink()
|
||
return True, str(dest.relative_to(ASSETS_ROOT))
|
||
|
||
|
||
def _trash_purge(days: int = 30) -> int:
|
||
if not TRASH_ROOT.exists():
|
||
return 0
|
||
cutoff = datetime.now() - timedelta(days=days)
|
||
count = 0
|
||
for info_file in list(TRASH_ROOT.rglob("*.trashinfo")):
|
||
try:
|
||
deleted_at = datetime.fromisoformat(
|
||
json.loads(info_file.read_text())["deleted_at"]
|
||
)
|
||
if deleted_at < cutoff:
|
||
f = Path(str(info_file)[:-len(".trashinfo")])
|
||
if f.exists():
|
||
f.unlink()
|
||
info_file.unlink()
|
||
count += 1
|
||
except Exception:
|
||
pass
|
||
for d in sorted(TRASH_ROOT.rglob("*"), reverse=True):
|
||
if d.is_dir() and d != TRASH_ROOT:
|
||
try:
|
||
d.rmdir()
|
||
except OSError:
|
||
pass
|
||
return count
|
||
|
||
|
||
def _trash_list() -> list:
|
||
if not TRASH_ROOT.exists():
|
||
return []
|
||
entries = []
|
||
for info_file in TRASH_ROOT.rglob("*.trashinfo"):
|
||
try:
|
||
data = json.loads(info_file.read_text())
|
||
fpath = Path(str(info_file)[:-len(".trashinfo")])
|
||
if not fpath.exists():
|
||
info_file.unlink()
|
||
continue
|
||
rel = fpath.relative_to(TRASH_ROOT)
|
||
ext = fpath.suffix.lower()
|
||
stat = fpath.stat()
|
||
entries.append({
|
||
"name": fpath.name,
|
||
"path": str(rel),
|
||
"original_path": data.get("original_path", str(rel)),
|
||
"deleted_at": datetime.fromisoformat(data["deleted_at"]),
|
||
"size": stat.st_size,
|
||
"is_dir": False,
|
||
"is_image": ext in IMAGE_EXT,
|
||
"is_text": ext in TEXT_EXT,
|
||
"is_pdf": ext in PDF_EXT,
|
||
"ext": ext,
|
||
})
|
||
except Exception:
|
||
pass
|
||
entries.sort(key=lambda e: e["deleted_at"], reverse=True)
|
||
return entries
|
||
|
||
|
||
def _trash_count() -> int:
|
||
if not TRASH_ROOT.exists():
|
||
return 0
|
||
return sum(1 for f in TRASH_ROOT.rglob("*")
|
||
if f.is_file() and not f.name.endswith(".trashinfo"))
|
||
|
||
|
||
def _trash_stats() -> dict:
|
||
if not TRASH_ROOT.exists():
|
||
return {"files": 0, "size": 0, "oldest": None}
|
||
files, size, oldest = 0, 0, None
|
||
for f in TRASH_ROOT.rglob("*"):
|
||
if not f.is_file() or f.name.endswith(".trashinfo"):
|
||
continue
|
||
files += 1
|
||
size += f.stat().st_size
|
||
info = Path(str(f) + ".trashinfo")
|
||
try:
|
||
dt = datetime.fromisoformat(json.loads(info.read_text())["deleted_at"])
|
||
if oldest is None or dt < oldest:
|
||
oldest = dt
|
||
except Exception:
|
||
pass
|
||
return {"files": files, "size": size, "oldest": oldest}
|
||
|
||
|
||
def _folder_stats(path: Path) -> dict:
|
||
files, size = 0, 0
|
||
for f in path.rglob("*"):
|
||
if not f.is_file():
|
||
continue
|
||
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
|
||
continue
|
||
files += 1
|
||
size += f.stat().st_size
|
||
return {"files": files, "size": size}
|
||
|
||
|
||
def _parse_date(s: str):
|
||
if not s:
|
||
return None
|
||
try:
|
||
return datetime.strptime(s, "%Y-%m-%d")
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _load_hits() -> dict:
|
||
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
|
||
if not STATS_JSON.is_file():
|
||
return {}
|
||
try:
|
||
data = json.loads(STATS_JSON.read_text())
|
||
hits = {}
|
||
for item in data.get("requests", {}).get("data", []):
|
||
url = item.get("data", "").lstrip("/")
|
||
count = item.get("hits", {}).get("count", 0)
|
||
if url and count:
|
||
hits[url] = count
|
||
return hits
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _entry(item: Path) -> dict:
|
||
stat = item.stat()
|
||
ext = item.suffix.lower()
|
||
return {
|
||
"name": item.name,
|
||
"path": str(item.relative_to(ASSETS_ROOT)),
|
||
"is_dir": item.is_dir(),
|
||
"size": stat.st_size if item.is_file() else None,
|
||
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
||
"ext": ext,
|
||
"is_image": ext in IMAGE_EXT,
|
||
"is_text": ext in TEXT_EXT,
|
||
"is_pdf": ext in PDF_EXT,
|
||
}
|
||
|
||
|
||
# ── Context processor ────────────────────────────────────────────────
|
||
|
||
@app.context_processor
|
||
def _inject_globals():
|
||
u = _user()
|
||
return {
|
||
"user": u,
|
||
"trash_count": _trash_count() if u else 0,
|
||
"humansize": _humansize,
|
||
"app_version": _APP_VERSION,
|
||
}
|
||
|
||
|
||
# ── Auth ──────────────────────────────────────────────────────────────
|
||
|
||
@app.route("/auth/login")
|
||
def login():
|
||
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
|
||
|
||
|
||
@app.route("/auth/callback")
|
||
def callback():
|
||
token = oauth.alpid.authorize_access_token()
|
||
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
|
||
email = info.get("email", "")
|
||
groups = set(info.get("groups", []))
|
||
if groups:
|
||
is_admin = bool(groups & ADMIN_GROUPS)
|
||
elif ADMIN_EMAILS:
|
||
is_admin = email in ADMIN_EMAILS
|
||
else:
|
||
is_admin = True
|
||
session["user"] = {
|
||
"sub": info["sub"],
|
||
"name": info.get("name") or info.get("preferred_username", ""),
|
||
"email": email,
|
||
"is_admin": is_admin,
|
||
}
|
||
session["id_token"] = token.get("id_token", "")
|
||
return redirect(session.pop("next_url", url_for("dashboard")))
|
||
|
||
|
||
@app.route("/auth/logout")
|
||
def logout():
|
||
id_token = session.get("id_token")
|
||
session.clear()
|
||
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
|
||
if id_token:
|
||
params["id_token_hint"] = id_token
|
||
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
|
||
|
||
|
||
# ── Dashboard ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/")
|
||
def dashboard():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
folders = {}
|
||
for item in sorted(ASSETS_ROOT.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
if item.is_dir():
|
||
folders[item.name] = _folder_stats(item)
|
||
|
||
return render_template("dashboard.html",
|
||
folders=folders,
|
||
total_files=sum(v["files"] for v in folders.values()),
|
||
total_size=sum(v["size"] for v in folders.values()),
|
||
trash=_trash_stats(),
|
||
)
|
||
|
||
|
||
@app.route("/changelog")
|
||
def changelog():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
sections = _parse_changelog()
|
||
return render_template("changelog.html", sections=sections)
|
||
|
||
|
||
def _parse_changelog():
|
||
"""Parse CHANGELOG.md into a list of version dicts."""
|
||
try:
|
||
text = _CHANGELOG_FILE.read_text()
|
||
except Exception:
|
||
return []
|
||
|
||
sections = []
|
||
current = None
|
||
current_group = None
|
||
|
||
for line in text.splitlines():
|
||
if line.startswith("## "):
|
||
if current:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
sections.append(current)
|
||
m = re.match(r"## \[(.+?)\] — (.+)", line)
|
||
current = {"version": m.group(1) if m else line[3:],
|
||
"date": m.group(2) if m else "",
|
||
"groups": []}
|
||
current_group = None
|
||
elif line.startswith("### ") and current is not None:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
current_group = {"title": line[4:], "entries": []}
|
||
elif line.startswith("- ") and current_group is not None:
|
||
current_group["entries"].append(line[2:])
|
||
|
||
if current:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
sections.append(current)
|
||
|
||
return sections
|
||
|
||
|
||
# ── Erreurs 404 — helpers ─────────────────────────────────────────────
|
||
|
||
def _load_ignored_ips() -> set:
|
||
try:
|
||
return set(json.loads(_IGNORED_IPS_FILE.read_text()).get("ips", []))
|
||
except Exception:
|
||
return set()
|
||
|
||
def _save_ignored_ips(ips: set) -> None:
|
||
_IGNORED_IPS_FILE.write_text(json.dumps({"ips": sorted(ips)}, indent=2))
|
||
|
||
def _log_files_404(days: int = 7) -> list:
|
||
if not STATS_LOG_FILE:
|
||
return []
|
||
log_dir = Path(STATS_LOG_FILE).parent
|
||
files = []
|
||
for f in sorted(log_dir.glob("*-access.log*"), reverse=True)[:days]:
|
||
files.append((f, f.name.endswith(".gz")))
|
||
return files
|
||
|
||
def _parse_404s(days: int = 7) -> dict:
|
||
global _404_CACHE, _404_CACHE_TS
|
||
now = time.time()
|
||
ign_mtime = _IGNORED_IPS_FILE.stat().st_mtime if _IGNORED_IPS_FILE.exists() else 0
|
||
if now - _404_CACHE_TS < _404_CACHE_TTL and _404_CACHE and ign_mtime <= _404_CACHE_TS:
|
||
return _404_CACHE
|
||
|
||
ignored = _load_ignored_ips()
|
||
results: dict = {}
|
||
|
||
for fpath, is_gz in _log_files_404(days):
|
||
try:
|
||
opener = gzip.open(fpath, "rt", errors="replace") if is_gz \
|
||
else open(fpath, errors="replace")
|
||
with opener as f:
|
||
for line in f:
|
||
m = _LOG_RE.match(line)
|
||
if not m:
|
||
continue
|
||
ip, dt_str, path, status, referer = m.groups()
|
||
if status != "404":
|
||
continue
|
||
if ip in ignored:
|
||
continue
|
||
path = path.split("?")[0]
|
||
try:
|
||
dt = datetime.strptime(dt_str, "%d/%b/%Y:%H:%M:%S %z")
|
||
except ValueError:
|
||
continue
|
||
if path not in results:
|
||
results[path] = {"count": 0, "last_seen": dt, "ips": {}}
|
||
r = results[path]
|
||
r["count"] += 1
|
||
if dt > r["last_seen"]:
|
||
r["last_seen"] = dt
|
||
if ip not in r["ips"]:
|
||
r["ips"][ip] = []
|
||
r["ips"][ip].append({"dt": dt, "referer": referer if referer != "-" else ""})
|
||
except Exception:
|
||
continue
|
||
|
||
_404_CACHE = results
|
||
_404_CACHE_TS = now
|
||
return results
|
||
|
||
def _is_still_404(path: str) -> bool:
|
||
clean = path.lstrip("/")
|
||
return not (ASSETS_ROOT / clean).exists()
|
||
|
||
def _invalidate_404_cache():
|
||
global _404_CACHE_TS
|
||
_404_CACHE_TS = 0
|
||
|
||
|
||
# ── Bannissement fail2ban — helpers ───────────────────────────────────
|
||
|
||
def _get_banned_ips() -> tuple:
|
||
"""Returns (set_of_ips, list_of_networks) from fail2ban global-blacklist."""
|
||
global _BANNED_CACHE, _BANNED_CACHE_TS
|
||
now = time.time()
|
||
if now - _BANNED_CACHE_TS < _BANNED_CACHE_TTL:
|
||
return _BANNED_CACHE
|
||
try:
|
||
r = subprocess.run(
|
||
["/usr/bin/sudo", "/usr/bin/fail2ban-client", "status", "global-blacklist"],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
raw: set = set()
|
||
for line in r.stdout.splitlines():
|
||
if "Banned IP list:" in line:
|
||
raw = set(line.split("Banned IP list:")[1].split())
|
||
break
|
||
ips, nets = set(), []
|
||
for entry in raw:
|
||
if "/" in entry:
|
||
try:
|
||
nets.append(ipaddress.ip_network(entry, strict=False))
|
||
except ValueError:
|
||
pass
|
||
else:
|
||
ips.add(entry)
|
||
_BANNED_CACHE = (ips, nets)
|
||
except Exception:
|
||
_BANNED_CACHE = (set(), [])
|
||
_BANNED_CACHE_TS = now
|
||
return _BANNED_CACHE
|
||
|
||
def _ip_is_banned(ip: str) -> bool:
|
||
banned_ips, banned_nets = _get_banned_ips()
|
||
if ip in banned_ips:
|
||
return True
|
||
try:
|
||
addr = ipaddress.ip_address(ip)
|
||
return any(addr in net for net in banned_nets)
|
||
except ValueError:
|
||
return False
|
||
|
||
def _pg_init_pool() -> bool:
|
||
global _pg_pool_obj
|
||
if _pg_pool_obj is not None:
|
||
return True
|
||
if not _HAS_PG or not _pg_dsn:
|
||
return False
|
||
try:
|
||
_pg_pool_obj = _pg_pool_lib.ThreadedConnectionPool(1, 3, _pg_dsn)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def _pg_ensure_schema(conn) -> None:
|
||
global _pg_schema_ok
|
||
if _pg_schema_ok:
|
||
return
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS ip_asn_cache (
|
||
ip TEXT PRIMARY KEY,
|
||
asn TEXT NOT NULL DEFAULT '',
|
||
name TEXT NOT NULL DEFAULT '',
|
||
country TEXT NOT NULL DEFAULT '',
|
||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||
)
|
||
""")
|
||
conn.commit()
|
||
_pg_schema_ok = True
|
||
|
||
@contextmanager
|
||
def _pg():
|
||
"""Yields a psycopg2 connection from the pool, or None if unavailable."""
|
||
if not _pg_init_pool():
|
||
yield None
|
||
return
|
||
conn = None
|
||
try:
|
||
conn = _pg_pool_obj.getconn()
|
||
_pg_ensure_schema(conn)
|
||
except Exception:
|
||
if conn:
|
||
try: _pg_pool_obj.putconn(conn)
|
||
except: pass
|
||
yield None
|
||
return
|
||
try:
|
||
yield conn
|
||
finally:
|
||
try: _pg_pool_obj.putconn(conn)
|
||
except: pass
|
||
|
||
def _lookup_ip_asn(ip: str) -> dict:
|
||
"""Returns {asn, name, country} — PostgreSQL cache (30 j) puis ip-api.com."""
|
||
with _pg() as conn:
|
||
if conn:
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT asn, name, country FROM ip_asn_cache "
|
||
"WHERE ip = %s AND fetched_at > now() - interval '30 days'",
|
||
(ip,)
|
||
)
|
||
row = cur.fetchone()
|
||
if row:
|
||
return {"asn": row[0], "name": row[1], "country": row[2]}
|
||
except Exception:
|
||
try: conn.rollback()
|
||
except: pass
|
||
|
||
url = f"http://ip-api.com/json/{ip}?fields=as,countryCode"
|
||
result: dict = {"asn": "", "name": "", "country": ""}
|
||
try:
|
||
req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"})
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
d = json.loads(resp.read())
|
||
as_field = d.get("as", "")
|
||
asn, name = "", as_field
|
||
if as_field.startswith("AS"):
|
||
parts = as_field.split(" ", 1)
|
||
asn = parts[0][2:]
|
||
name = parts[1] if len(parts) > 1 else ""
|
||
result = {"asn": asn, "name": name, "country": d.get("countryCode", "")}
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
|
||
with _pg() as conn:
|
||
if conn:
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
INSERT INTO ip_asn_cache (ip, asn, name, country)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON CONFLICT (ip) DO UPDATE SET
|
||
asn = EXCLUDED.asn, name = EXCLUDED.name,
|
||
country = EXCLUDED.country, fetched_at = now()
|
||
""", (ip, result["asn"], result["name"], result["country"]))
|
||
conn.commit()
|
||
except Exception:
|
||
try: conn.rollback()
|
||
except: pass
|
||
|
||
return result
|
||
|
||
def _lookup_as_prefixes(asn: str) -> list:
|
||
"""Returns IPv4 CIDRs for an ASN via RIPE Stat. Cached 30 days."""
|
||
_AS_CACHE_DIR.mkdir(exist_ok=True)
|
||
cache_file = _AS_CACHE_DIR / f"AS{asn}.json"
|
||
now = time.time()
|
||
if cache_file.exists() and now - cache_file.stat().st_mtime < _AS_CACHE_TTL:
|
||
return json.loads(cache_file.read_text()).get("prefixes", [])
|
||
url = f"https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS{asn}"
|
||
try:
|
||
req = urllib.request.Request(url, headers={"User-Agent": "alpinux-static/1.0"})
|
||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||
data = json.loads(resp.read())
|
||
prefixes = [
|
||
p["prefix"] for p in data.get("data", {}).get("prefixes", [])
|
||
if ":" not in p.get("prefix", "")
|
||
]
|
||
cache_file.write_text(json.dumps({"asn": asn, "prefixes": prefixes}))
|
||
return prefixes
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
# ── Navigateur de fichiers ────────────────────────────────────────────
|
||
|
||
@app.route("/browse/")
|
||
@app.route("/browse/<path:subpath>")
|
||
def browse(subpath=""):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
|
||
if subpath:
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
if target.is_file():
|
||
return _file_preview(target, subpath)
|
||
|
||
hits = _load_hits()
|
||
|
||
entries = []
|
||
for item in sorted(target.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
e = _entry(item)
|
||
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
|
||
entries.append(e)
|
||
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
|
||
|
||
parts = Path(subpath).parts if subpath else ()
|
||
breadcrumb = [
|
||
{"name": p, "path": str(Path(*parts[:i + 1]))}
|
||
for i, p in enumerate(parts)
|
||
]
|
||
|
||
return render_template("browse.html",
|
||
entries=entries,
|
||
subpath=subpath,
|
||
breadcrumb=breadcrumb,
|
||
has_hits=STATS_JSON.is_file(),
|
||
)
|
||
|
||
|
||
def _file_preview(path: Path, subpath: str):
|
||
ext = path.suffix.lower()
|
||
stat = path.stat()
|
||
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
|
||
|
||
siblings = sorted(
|
||
[f for f in path.parent.iterdir()
|
||
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
|
||
key=lambda f: f.name.lower(),
|
||
)
|
||
idx = next((i for i, f in enumerate(siblings) if f == path), None)
|
||
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
|
||
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
|
||
|
||
ctx = dict(
|
||
subpath=subpath,
|
||
filename=path.name,
|
||
filesize=_humansize(stat.st_size),
|
||
mtime=datetime.fromtimestamp(stat.st_mtime),
|
||
raw_url=url_for("raw_file", subpath=subpath),
|
||
parent_path=parent,
|
||
prev_path=prev_path,
|
||
next_path=next_path,
|
||
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
|
||
ext=ext,
|
||
)
|
||
if ext in IMAGE_EXT:
|
||
ctx["meta"] = _image_meta(path)
|
||
return render_template("preview_image.html", **ctx)
|
||
if ext in TEXT_EXT:
|
||
content = path.read_text(errors="replace")
|
||
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
|
||
return render_template("preview_other.html", **ctx)
|
||
|
||
|
||
# ── Recherche ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/search")
|
||
def search():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
q = request.args.get("q", "").strip()
|
||
after_s = request.args.get("after", "").strip()
|
||
before_s = request.args.get("before", "").strip()
|
||
in_content = request.args.get("content") == "1"
|
||
|
||
dt_after = _parse_date(after_s)
|
||
dt_before = _parse_date(before_s)
|
||
|
||
results = []
|
||
searched = bool(q or dt_after or dt_before)
|
||
|
||
if searched:
|
||
q_low = q.lower()
|
||
for path in sorted(ASSETS_ROOT.rglob("*")):
|
||
if not path.is_file():
|
||
continue
|
||
parts = path.relative_to(ASSETS_ROOT).parts
|
||
if any(p in _HIDDEN or p.startswith(".") for p in parts):
|
||
continue
|
||
|
||
stat = path.stat()
|
||
mtime = datetime.fromtimestamp(stat.st_mtime)
|
||
|
||
if dt_after and mtime.date() < dt_after.date():
|
||
continue
|
||
if dt_before and mtime.date() > dt_before.date():
|
||
continue
|
||
|
||
name_match = (not q) or q_low in path.name.lower()
|
||
content_match = None
|
||
|
||
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
|
||
try:
|
||
text = path.read_text(errors="replace")
|
||
idx = text.lower().find(q_low)
|
||
if idx != -1:
|
||
start = max(0, idx - 60)
|
||
end = min(len(text), idx + len(q) + 60)
|
||
snippet = text[start:end].replace("\n", " ")
|
||
content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "")
|
||
name_match = True
|
||
except Exception:
|
||
pass
|
||
|
||
if not name_match:
|
||
continue
|
||
|
||
e = _entry(path)
|
||
e["content_match"] = content_match
|
||
results.append(e)
|
||
|
||
return render_template("search.html",
|
||
q=q,
|
||
after=after_s,
|
||
before=before_s,
|
||
in_content=in_content,
|
||
results=results,
|
||
searched=searched,
|
||
)
|
||
|
||
|
||
# ── Statistiques GoAccess ─────────────────────────────────────────────
|
||
|
||
@app.route("/stats/")
|
||
def stats():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return render_template("stats.html",
|
||
has_report=STATS_FILE.is_file(),
|
||
)
|
||
|
||
|
||
@app.route("/stats/report")
|
||
def stats_report():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if not STATS_FILE.is_file():
|
||
abort(404)
|
||
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
|
||
|
||
|
||
def _do_generate():
|
||
try:
|
||
if STATS_GENERATE_CMD:
|
||
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
|
||
else:
|
||
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
|
||
f"--output={STATS_FILE}"]
|
||
if STATS_JSON:
|
||
cmd.append(f"--output={STATS_JSON}")
|
||
subprocess.run(cmd, timeout=600, check=False)
|
||
finally:
|
||
with _gen_lock:
|
||
_gen_state["generating"] = False
|
||
|
||
|
||
@app.route("/stats/generate", methods=["POST"])
|
||
def stats_generate():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if STATS_FILE.is_file():
|
||
return jsonify({"status": "exists"})
|
||
with _gen_lock:
|
||
if _gen_state["generating"]:
|
||
return jsonify({"status": "generating"})
|
||
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
|
||
return jsonify({"status": "error",
|
||
"message": "STATS_LOG_FILE non configuré"}), 500
|
||
_gen_state["generating"] = True
|
||
threading.Thread(target=_do_generate, daemon=True).start()
|
||
return jsonify({"status": "started"})
|
||
|
||
|
||
@app.route("/stats/status")
|
||
def stats_status():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return jsonify({"ready": STATS_FILE.is_file(),
|
||
"generating": _gen_state["generating"]})
|
||
|
||
|
||
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
|
||
|
||
@app.route("/raw/<path:subpath>")
|
||
def raw_file(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
# ── Suppression de fichiers ───────────────────────────────────────────
|
||
|
||
@app.route("/delete", methods=["POST"])
|
||
def delete_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
subpath = request.form.get("path", "").strip()
|
||
if not subpath:
|
||
abort(400)
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
if target.is_dir():
|
||
abort(400)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
parent = str(Path(subpath).parent)
|
||
_trash_move(target, subpath)
|
||
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
||
|
||
|
||
# ── Erreurs 404 ───────────────────────────────────────────────────────
|
||
|
||
def _filter_banned_from_entry(info: dict) -> dict | None:
|
||
"""Remove banned IPs from an entry dict. Returns None if no IPs remain."""
|
||
ips_ok = {ip: hits for ip, hits in info["ips"].items() if not _ip_is_banned(ip)}
|
||
if not ips_ok:
|
||
return None
|
||
count = sum(len(h) for h in ips_ok.values())
|
||
last_seen = max(max(h["dt"] for h in hits) for hits in ips_ok.values())
|
||
return {"count": count, "last_seen": last_seen, "ips": ips_ok}
|
||
|
||
|
||
@app.route("/errors/")
|
||
def errors_404():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
data = _parse_404s()
|
||
filtered = {}
|
||
for path, info in data.items():
|
||
entry = _filter_banned_from_entry(info)
|
||
if entry:
|
||
filtered[path] = entry
|
||
entries = sorted(filtered.items(), key=lambda x: x[1]["count"], reverse=True)
|
||
ignored = _load_ignored_ips()
|
||
return render_template("errors_404.html",
|
||
entries=entries,
|
||
ignored_ips=sorted(ignored),
|
||
total=sum(v["count"] for v in filtered.values()),
|
||
log_configured=bool(STATS_LOG_FILE),
|
||
)
|
||
|
||
|
||
@app.route("/errors/detail")
|
||
def errors_detail():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
path = request.args.get("path", "")
|
||
data = _parse_404s()
|
||
raw_entry = data.get(path)
|
||
if not raw_entry:
|
||
return jsonify({"error": "introuvable"}), 404
|
||
entry = _filter_banned_from_entry(raw_entry) or {"count": 0, "last_seen": raw_entry["last_seen"], "ips": {}}
|
||
ignored = _load_ignored_ips()
|
||
ip_list = []
|
||
for ip, hits in sorted(entry["ips"].items(), key=lambda x: len(x[1]), reverse=True):
|
||
last = max(h["dt"] for h in hits)
|
||
ip_list.append({
|
||
"ip": ip,
|
||
"count": len(hits),
|
||
"last_seen": last.strftime("%d/%m/%Y %H:%M"),
|
||
"ignored": ip in ignored,
|
||
"hits": [
|
||
{"dt": h["dt"].strftime("%d/%m/%Y %H:%M"), "referer": h["referer"]}
|
||
for h in sorted(hits, key=lambda x: x["dt"], reverse=True)[:30]
|
||
],
|
||
})
|
||
return jsonify({
|
||
"path": path,
|
||
"count": entry["count"],
|
||
"still_404": _is_still_404(path),
|
||
"last_seen": entry["last_seen"].strftime("%d/%m/%Y %H:%M"),
|
||
"ips": ip_list,
|
||
})
|
||
|
||
|
||
@app.route("/errors/asinfo")
|
||
def errors_asinfo():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
ip = request.args.get("ip", "").strip()
|
||
if not re.match(r"^[\d\.]+$", ip):
|
||
return jsonify({"error": "IP invalide"}), 400
|
||
info = _lookup_ip_asn(ip)
|
||
if info.get("asn"):
|
||
info["prefix_count"] = len(_lookup_as_prefixes(info["asn"]))
|
||
else:
|
||
info["prefix_count"] = 0
|
||
return jsonify(info)
|
||
|
||
|
||
@app.route("/errors/ignore", methods=["POST"])
|
||
def errors_ignore():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
ip = request.form.get("ip", "").strip()
|
||
action = request.form.get("action", "add")
|
||
if not re.match(r"^[\d\.a-fA-F:]+$", ip):
|
||
return jsonify({"error": "IP invalide"}), 400
|
||
ips = _load_ignored_ips()
|
||
ips.add(ip) if action == "add" else ips.discard(ip)
|
||
_save_ignored_ips(ips)
|
||
_invalidate_404_cache()
|
||
return jsonify({"ok": True, "action": action, "ip": ip})
|
||
|
||
|
||
@app.route("/errors/ban", methods=["POST"])
|
||
def errors_ban():
|
||
global _BANNED_CACHE_TS
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
ip = request.form.get("ip", "").strip()
|
||
jail = request.form.get("jail", "global-blacklist")
|
||
ban_as = request.form.get("ban_as") == "1"
|
||
if not re.match(r"^[\d\.a-fA-F:]+$", ip):
|
||
return jsonify({"error": "IP invalide"}), 400
|
||
try:
|
||
if ban_as:
|
||
info = _lookup_ip_asn(ip)
|
||
if not info.get("asn"):
|
||
return jsonify({"error": "ASN introuvable pour cette IP"}), 400
|
||
targets = _lookup_as_prefixes(info["asn"])
|
||
if not targets:
|
||
return jsonify({"error": "Aucun préfixe IPv4 trouvé pour cet AS"}), 400
|
||
else:
|
||
targets = [ip]
|
||
r = subprocess.run(
|
||
["/usr/bin/sudo", "/usr/bin/fail2ban-client", "set", jail, "banip"] + targets,
|
||
capture_output=True, text=True, timeout=60
|
||
)
|
||
if r.returncode == 0:
|
||
_BANNED_CACHE_TS = 0 # invalide le cache pour ce worker
|
||
return jsonify({"ok": True, "ip": ip, "jail": jail, "count": len(targets)})
|
||
return jsonify({"error": r.stderr.strip() or "Erreur fail2ban"}), 500
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
# ── Corbeille ────────────────────────────────────────────────────────
|
||
|
||
@app.route("/trash")
|
||
def trash_list():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
_trash_purge(30)
|
||
entries = _trash_list()
|
||
return render_template("trash.html", entries=entries)
|
||
|
||
|
||
@app.route("/trash/raw/<path:subpath>")
|
||
def trash_raw(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = (TRASH_ROOT / subpath).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if not target.is_file():
|
||
abort(404)
|
||
return send_from_directory(TRASH_ROOT, subpath)
|
||
|
||
|
||
@app.route("/trash/delete", methods=["POST"])
|
||
def trash_delete():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
trash_rel = request.form.get("path", "").strip()
|
||
if not trash_rel:
|
||
abort(400)
|
||
target = (TRASH_ROOT / trash_rel).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if target.is_file():
|
||
target.unlink()
|
||
info = Path(str(target) + ".trashinfo")
|
||
if info.exists():
|
||
info.unlink()
|
||
return redirect(url_for("trash_list"))
|
||
|
||
|
||
@app.route("/trash/restore", methods=["POST"])
|
||
def trash_restore():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
trash_rel = request.form.get("path", "").strip()
|
||
conflict = request.form.get("conflict", "").strip()
|
||
if not trash_rel:
|
||
abort(400)
|
||
ok, result = _trash_restore(trash_rel, conflict)
|
||
if not ok:
|
||
if result == "conflict":
|
||
return jsonify({"conflict": True, "path": trash_rel}), 409
|
||
abort(400)
|
||
parent = str(Path(result).parent)
|
||
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
||
|
||
|
||
@app.route("/trash/preview/<path:subpath>")
|
||
def trash_preview(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = (TRASH_ROOT / subpath).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if not target.is_file():
|
||
abort(404)
|
||
stat = target.stat()
|
||
ext = target.suffix.lower()
|
||
ctx = dict(
|
||
subpath = subpath,
|
||
filename = target.name,
|
||
filesize = _humansize(stat.st_size),
|
||
mtime = datetime.fromtimestamp(stat.st_mtime),
|
||
raw_url = url_for("trash_raw", subpath=subpath),
|
||
parent_path= None,
|
||
prev_path = None,
|
||
next_path = None,
|
||
sibling_pos= "",
|
||
ext = ext,
|
||
from_trash = True,
|
||
)
|
||
if ext in IMAGE_EXT:
|
||
ctx["meta"] = _image_meta(target)
|
||
return render_template("preview_image.html", **ctx)
|
||
if ext in TEXT_EXT:
|
||
return render_template("preview_text.html",
|
||
content=target.read_text(errors="replace"),
|
||
lang=ext.lstrip("."), **ctx)
|
||
return render_template("preview_other.html", **ctx)
|
||
|
||
|
||
@app.route("/trash/empty", methods=["POST"])
|
||
def trash_empty():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if TRASH_ROOT.exists():
|
||
shutil.rmtree(TRASH_ROOT)
|
||
TRASH_ROOT.mkdir(exist_ok=True)
|
||
return redirect(url_for("trash_list"))
|
||
|
||
|
||
# ── Renommage de fichiers ─────────────────────────────────────────────
|
||
|
||
@app.route("/rename", methods=["POST"])
|
||
def rename_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
new_name = request.form.get("new_name", "").strip()
|
||
|
||
if not subpath or not new_name:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
if "/" in new_name or "\\" in new_name or new_name in (".", ".."):
|
||
return jsonify({"error": "Nom invalide"}), 400
|
||
|
||
new_name = secure_filename(new_name)
|
||
if not new_name:
|
||
return jsonify({"error": "Nom invalide après nettoyage"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"error": "Fichier introuvable"}), 404
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
return jsonify({"error": "Accès refusé"}), 403
|
||
|
||
dest = target.parent / new_name
|
||
if not dest.is_relative_to(ASSETS_ROOT):
|
||
return jsonify({"error": "Destination invalide"}), 400
|
||
|
||
if dest.exists():
|
||
return jsonify({"error": f"« {new_name} » existe déjà"}), 409
|
||
|
||
target.rename(dest)
|
||
new_path = str(dest.relative_to(ASSETS_ROOT))
|
||
return jsonify({
|
||
"name": new_name,
|
||
"path": new_path,
|
||
"browse_url": url_for("browse", subpath=new_path),
|
||
})
|
||
|
||
|
||
# ── Vérification des conflits avant upload ───────────────────────────
|
||
|
||
@app.route("/check-upload", methods=["POST"])
|
||
def check_upload():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
names = request.form.getlist("names")
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()]
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Upload de fichiers ────────────────────────────────────────────────
|
||
|
||
@app.route("/upload", methods=["POST"])
|
||
def upload_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
files = request.files.getlist("files")
|
||
|
||
if not files or all(f.filename == "" for f in files):
|
||
abort(400)
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflict = request.form.get("conflict", "overwrite")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "overwrite"
|
||
|
||
for f in files:
|
||
name = secure_filename(f.filename or "")
|
||
if not name:
|
||
continue
|
||
out_path = dest / name
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
out_path.rename(_backup_path(out_path))
|
||
except Exception:
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
f.save(out_path)
|
||
|
||
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
|
||
|
||
|
||
# ── Vérification des conflits avant redimensionnement ────────────────
|
||
|
||
@app.route("/check-resize", methods=["POST"])
|
||
def check_resize():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
|
||
if not subpath:
|
||
return jsonify({"conflicts": []})
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"conflicts": []})
|
||
|
||
try:
|
||
sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES]
|
||
except (ValueError, TypeError):
|
||
sizes = []
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
|
||
src_ext = target.suffix.lstrip(".")
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
|
||
all_dims = [(s, s) for s in sizes]
|
||
for cs in request.form.getlist("custom_sizes"):
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w >= 1 and h >= 1:
|
||
all_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Pas de dimensions sélectionnées → conserver celles d'origine
|
||
if not all_dims:
|
||
try:
|
||
src = Image.open(target)
|
||
all_dims = [(src.width, src.height)]
|
||
src.close()
|
||
except Exception:
|
||
return jsonify({"conflicts": []})
|
||
|
||
# Pas de format sélectionné → conserver le format d'origine
|
||
if not formats:
|
||
formats = [src_ext]
|
||
|
||
conflicts = []
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
if (parent / out_name).exists():
|
||
conflicts.append(out_name)
|
||
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Redimensionnement d'images ────────────────────────────────────────
|
||
|
||
@app.route("/resize", methods=["POST"])
|
||
def resize_image():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
raw_customs = request.form.getlist("custom_sizes")
|
||
|
||
if not subpath or not raw_formats:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
|
||
ext = target.suffix.lower()
|
||
if ext not in IMAGE_EXT:
|
||
abort(400)
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
try:
|
||
square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})]
|
||
except (ValueError, TypeError):
|
||
square_dims = []
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
|
||
# Dimensions libres — validées contre la résolution source
|
||
src_img = Image.open(target)
|
||
max_w, max_h = src_img.width, src_img.height
|
||
src_img.close()
|
||
|
||
custom_dims = []
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w < 1 or h < 1:
|
||
continue
|
||
if w > max_w or h > max_h:
|
||
continue
|
||
custom_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
all_dims = square_dims + custom_dims
|
||
|
||
# Pas de dimensions sélectionnées → conserver celles d'origine
|
||
if not all_dims:
|
||
all_dims = [(max_w, max_h)]
|
||
|
||
# Pas de format sélectionné → conserver le format d'origine
|
||
if not formats:
|
||
formats = [ext.lstrip(".")]
|
||
|
||
conflict = request.form.get("conflict", "skip")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "skip"
|
||
|
||
is_svg = ext == ".svg"
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
created, errors = [], []
|
||
|
||
# Signaler les dimensions hors-bornes
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w > max_w or h > max_h:
|
||
for f in formats:
|
||
errors.append({"name": f"{stem}_{w}x{h}.{f}",
|
||
"reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"})
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
out_path = parent / out_name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
errors.append({"name": out_name, "reason": "Fichier existant, ignoré"})
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
bak = _backup_path(out_path)
|
||
out_path.rename(bak)
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"})
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
out_name = out_path.name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
try:
|
||
if fmt == "svg" and is_svg:
|
||
shutil.copy2(target, out_path)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
elif fmt == "svg" and not is_svg:
|
||
errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"})
|
||
|
||
elif is_svg:
|
||
if not HAS_CAIROSVG:
|
||
errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"})
|
||
continue
|
||
if fmt in ("png", "ico"):
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGBA")
|
||
if fmt == "ico":
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
else:
|
||
img.save(out_path, format="PNG")
|
||
else:
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
else:
|
||
img = Image.open(target)
|
||
# LANCZOS ne supporte pas le mode palette (ICO, GIF…)
|
||
if img.mode not in ("RGB", "RGBA", "L", "LA", "I", "F"):
|
||
img = img.convert("RGBA")
|
||
img = img.resize((w, h), Image.LANCZOS)
|
||
if fmt == "png":
|
||
if img.mode not in ("RGBA", "RGB", "L"):
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="PNG")
|
||
elif fmt == "jpg":
|
||
img = img.convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
elif fmt == "ico":
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": str(exc)})
|
||
|
||
return jsonify({"created": created, "errors": errors})
|
||
|
||
|
||
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
|
||
|
||
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
|
||
|
||
|
||
@app.route("/favicon.ico")
|
||
def favicon():
|
||
return send_from_directory(ASSETS_ROOT, "favicon.ico")
|
||
|
||
|
||
@app.route("/robots.txt")
|
||
def robots():
|
||
return send_from_directory(ASSETS_ROOT, "robots.txt")
|
||
|
||
|
||
@app.route("/<path:subpath>")
|
||
def cdn_file(subpath):
|
||
top = Path(subpath).parts[0]
|
||
if top not in _PUBLIC_TOP:
|
||
abort(404)
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(debug=True, port=5003)
|