import io import json import os import re import shutil import subprocess import threading from pathlib import Path from datetime import datetime, timedelta from urllib.parse import urlencode from PIL import Image from flask import (Flask, redirect, url_for, session, request, render_template, abort, send_from_directory, jsonify) from authlib.integrations.flask_client import OAuth from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.environ["SECRET_KEY"] app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) oauth = OAuth(app) oauth.register( name="alpid", server_metadata_url=os.environ["ALPID_DISCOVERY_URL"], client_id=os.environ["ALPID_CLIENT_ID"], client_secret=os.environ["ALPID_CLIENT_SECRET"], client_kwargs={"scope": "openid profile email"}, ) ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(",")) ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip()) ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve() TRASH_ROOT = ASSETS_ROOT / ".trash" _alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0] ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout" STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html")) STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json")) STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "") STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "") _gen_lock = threading.Lock() _gen_state = {"generating": False} try: _APP_VERSION = (Path(__file__).parent / "VERSION").read_text().strip() except Exception: _APP_VERSION = "—" _CHANGELOG_FILE = Path(__file__).parent / "CHANGELOG.md" _HIDDEN = frozenset({ ".git", "scripts", "app", ".env", ".env.example", ".gitignore", ".htaccess", ".htpasswd_stats", "standard_index.html", }) try: import cairosvg as _cairosvg HAS_CAIROSVG = True except ImportError: HAS_CAIROSVG = False IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"}) RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024}) RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"}) TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json", ".xml", ".conf", ".sh", ".robots", ".php"}) PDF_EXT = frozenset({".pdf"}) # ── Helpers ─────────────────────────────────────────────────────────── def _user(): return session.get("user") def _require_admin(): u = _user() if not u: session["next_url"] = request.url return redirect(url_for("login")) if not u.get("is_admin"): abort(403) return None def _safe_path(subpath: str) -> Path: target = (ASSETS_ROOT / subpath).resolve() if not target.is_relative_to(ASSETS_ROOT): abort(400) return target def _humansize(n: int) -> str: for unit in ("o", "Ko", "Mo", "Go"): if n < 1024: return f"{n:.0f} {unit}" n /= 1024 return f"{n:.1f} To" _EXIF_WANTED = frozenset({ "Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized", "ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash", "WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation", "XResolution", "YResolution", "ResolutionUnit", "ColorSpace", "PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo", }) def _image_meta(path: Path) -> dict: try: from PIL import ExifTags img = Image.open(path) meta = { "width": img.width, "height": img.height, "format": img.format or path.suffix.lstrip(".").upper(), "mode": img.mode, "dpi": img.info.get("dpi"), } exif = img.getexif() if exif: rows = {} for tag_id, val in exif.items(): tag = ExifTags.TAGS.get(tag_id, str(tag_id)) if tag in _EXIF_WANTED and tag != "GPSInfo": rows[tag] = str(val) sub = exif.get_ifd(0x8769) for tag_id, val in sub.items(): tag = ExifTags.TAGS.get(tag_id, str(tag_id)) if tag in _EXIF_WANTED and tag != "GPSInfo": rows[tag] = str(val) gps_ifd = exif.get_ifd(0x8825) if gps_ifd: rows["GPS"] = _parse_gps(gps_ifd) if rows: meta["exif"] = rows return meta except Exception: return {} def _parse_gps(gps_ifd: dict): try: from PIL import ExifTags def _dms(coords): d, m, s = coords return float(d) + float(m) / 60 + float(s) / 3600 lat = _dms(gps_ifd.get(2, (0, 0, 0))) lon = _dms(gps_ifd.get(4, (0, 0, 0))) latR = gps_ifd.get(1, "N") lonR = gps_ifd.get(3, "E") if latR == "S": lat = -lat if lonR == "W": lon = -lon return f"{lat:.6f}, {lon:.6f}" except Exception: return None def _backup_path(p: Path) -> Path: ts = datetime.now().strftime("%Y%m%d%H%M%S") return p.parent / f"{p.stem}_bak_{ts}{p.suffix}" def _auto_rename(p: Path) -> Path: i = 1 while True: candidate = p.parent / f"{p.stem}_{i}{p.suffix}" if not candidate.exists(): return candidate i += 1 def _trash_move(target: Path, subpath: str) -> None: rel = Path(subpath) trash_dest = TRASH_ROOT / rel trash_dest.parent.mkdir(parents=True, exist_ok=True) if trash_dest.exists(): ts = datetime.now().strftime("%Y%m%d%H%M%S") trash_dest = trash_dest.parent / f"{trash_dest.stem}_{ts}{trash_dest.suffix}" shutil.move(str(target), trash_dest) Path(str(trash_dest) + ".trashinfo").write_text(json.dumps({ "original_path": str(rel), "deleted_at": datetime.now().isoformat(timespec="seconds"), })) def _trash_restore(trash_rel: str, conflict: str = "") -> tuple: target = (TRASH_ROOT / trash_rel).resolve() if not str(target).startswith(str(TRASH_ROOT)): return False, "forbidden" if not target.is_file(): return False, "not_found" info_path = Path(str(target) + ".trashinfo") try: original = json.loads(info_path.read_text())["original_path"] except Exception: original = trash_rel dest = (ASSETS_ROOT / original).resolve() if not str(dest).startswith(str(ASSETS_ROOT)): return False, "forbidden" if dest.exists(): if not conflict: return False, "conflict" if conflict == "rename": dest = _auto_rename(dest) dest.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(target), str(dest)) if info_path.exists(): info_path.unlink() return True, str(dest.relative_to(ASSETS_ROOT)) def _trash_purge(days: int = 30) -> int: if not TRASH_ROOT.exists(): return 0 cutoff = datetime.now() - timedelta(days=days) count = 0 for info_file in list(TRASH_ROOT.rglob("*.trashinfo")): try: deleted_at = datetime.fromisoformat( json.loads(info_file.read_text())["deleted_at"] ) if deleted_at < cutoff: f = Path(str(info_file)[:-len(".trashinfo")]) if f.exists(): f.unlink() info_file.unlink() count += 1 except Exception: pass for d in sorted(TRASH_ROOT.rglob("*"), reverse=True): if d.is_dir() and d != TRASH_ROOT: try: d.rmdir() except OSError: pass return count def _trash_list() -> list: if not TRASH_ROOT.exists(): return [] entries = [] for info_file in TRASH_ROOT.rglob("*.trashinfo"): try: data = json.loads(info_file.read_text()) fpath = Path(str(info_file)[:-len(".trashinfo")]) if not fpath.exists(): info_file.unlink() continue rel = fpath.relative_to(TRASH_ROOT) ext = fpath.suffix.lower() stat = fpath.stat() entries.append({ "name": fpath.name, "path": str(rel), "original_path": data.get("original_path", str(rel)), "deleted_at": datetime.fromisoformat(data["deleted_at"]), "size": stat.st_size, "is_dir": False, "is_image": ext in IMAGE_EXT, "is_text": ext in TEXT_EXT, "is_pdf": ext in PDF_EXT, "ext": ext, }) except Exception: pass entries.sort(key=lambda e: e["deleted_at"], reverse=True) return entries def _trash_count() -> int: if not TRASH_ROOT.exists(): return 0 return sum(1 for f in TRASH_ROOT.rglob("*") if f.is_file() and not f.name.endswith(".trashinfo")) def _trash_stats() -> dict: if not TRASH_ROOT.exists(): return {"files": 0, "size": 0, "oldest": None} files, size, oldest = 0, 0, None for f in TRASH_ROOT.rglob("*"): if not f.is_file() or f.name.endswith(".trashinfo"): continue files += 1 size += f.stat().st_size info = Path(str(f) + ".trashinfo") try: dt = datetime.fromisoformat(json.loads(info.read_text())["deleted_at"]) if oldest is None or dt < oldest: oldest = dt except Exception: pass return {"files": files, "size": size, "oldest": oldest} def _folder_stats(path: Path) -> dict: files, size = 0, 0 for f in path.rglob("*"): if not f.is_file(): continue if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts): continue files += 1 size += f.stat().st_size return {"files": files, "size": size} def _parse_date(s: str): if not s: return None try: return datetime.strptime(s, "%Y-%m-%d") except ValueError: return None def _load_hits() -> dict: """Parse GoAccess JSON report → {relative_path: hit_count}.""" if not STATS_JSON.is_file(): return {} try: data = json.loads(STATS_JSON.read_text()) hits = {} for item in data.get("requests", {}).get("data", []): url = item.get("data", "").lstrip("/") count = item.get("hits", {}).get("count", 0) if url and count: hits[url] = count return hits except Exception: return {} def _entry(item: Path) -> dict: stat = item.stat() ext = item.suffix.lower() return { "name": item.name, "path": str(item.relative_to(ASSETS_ROOT)), "is_dir": item.is_dir(), "size": stat.st_size if item.is_file() else None, "mtime": datetime.fromtimestamp(stat.st_mtime), "ext": ext, "is_image": ext in IMAGE_EXT, "is_text": ext in TEXT_EXT, "is_pdf": ext in PDF_EXT, } # ── Context processor ──────────────────────────────────────────────── @app.context_processor def _inject_globals(): u = _user() return { "user": u, "trash_count": _trash_count() if u else 0, "humansize": _humansize, "app_version": _APP_VERSION, } # ── Auth ────────────────────────────────────────────────────────────── @app.route("/auth/login") def login(): return oauth.alpid.authorize_redirect(url_for("callback", _external=True)) @app.route("/auth/callback") def callback(): token = oauth.alpid.authorize_access_token() info = token.get("userinfo") or oauth.alpid.userinfo(token=token) email = info.get("email", "") groups = set(info.get("groups", [])) if groups: is_admin = bool(groups & ADMIN_GROUPS) elif ADMIN_EMAILS: is_admin = email in ADMIN_EMAILS else: is_admin = True session["user"] = { "sub": info["sub"], "name": info.get("name") or info.get("preferred_username", ""), "email": email, "is_admin": is_admin, } session["id_token"] = token.get("id_token", "") return redirect(session.pop("next_url", url_for("dashboard"))) @app.route("/auth/logout") def logout(): id_token = session.get("id_token") session.clear() params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)} if id_token: params["id_token_hint"] = id_token return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params)) # ── Dashboard ───────────────────────────────────────────────────────── @app.route("/") def dashboard(): redir = _require_admin() if redir: return redir folders = {} for item in sorted(ASSETS_ROOT.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue if item.is_dir(): folders[item.name] = _folder_stats(item) return render_template("dashboard.html", folders=folders, total_files=sum(v["files"] for v in folders.values()), total_size=sum(v["size"] for v in folders.values()), trash=_trash_stats(), ) @app.route("/changelog") def changelog(): redir = _require_admin() if redir: return redir sections = _parse_changelog() return render_template("changelog.html", sections=sections) def _parse_changelog(): """Parse CHANGELOG.md into a list of version dicts.""" try: text = _CHANGELOG_FILE.read_text() except Exception: return [] sections = [] current = None current_group = None for line in text.splitlines(): if line.startswith("## "): if current: if current_group: current["groups"].append(current_group) sections.append(current) m = re.match(r"## \[(.+?)\] — (.+)", line) current = {"version": m.group(1) if m else line[3:], "date": m.group(2) if m else "", "groups": []} current_group = None elif line.startswith("### ") and current is not None: if current_group: current["groups"].append(current_group) current_group = {"title": line[4:], "entries": []} elif line.startswith("- ") and current_group is not None: current_group["entries"].append(line[2:]) if current: if current_group: current["groups"].append(current_group) sections.append(current) return sections # ── Navigateur de fichiers ──────────────────────────────────────────── @app.route("/browse/") @app.route("/browse/") def browse(subpath=""): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.exists(): abort(404) if subpath: top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) if target.is_file(): return _file_preview(target, subpath) hits = _load_hits() entries = [] for item in sorted(target.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue e = _entry(item) e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None entries.append(e) entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower())) parts = Path(subpath).parts if subpath else () breadcrumb = [ {"name": p, "path": str(Path(*parts[:i + 1]))} for i, p in enumerate(parts) ] return render_template("browse.html", entries=entries, subpath=subpath, breadcrumb=breadcrumb, has_hits=STATS_JSON.is_file(), ) def _file_preview(path: Path, subpath: str): ext = path.suffix.lower() stat = path.stat() parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else "" siblings = sorted( [f for f in path.parent.iterdir() if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")], key=lambda f: f.name.lower(), ) idx = next((i for i, f in enumerate(siblings) if f == path), None) prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None ctx = dict( subpath=subpath, filename=path.name, filesize=_humansize(stat.st_size), mtime=datetime.fromtimestamp(stat.st_mtime), raw_url=url_for("raw_file", subpath=subpath), parent_path=parent, prev_path=prev_path, next_path=next_path, sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "", ext=ext, ) if ext in IMAGE_EXT: ctx["meta"] = _image_meta(path) return render_template("preview_image.html", **ctx) if ext in TEXT_EXT: content = path.read_text(errors="replace") return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx) return render_template("preview_other.html", **ctx) # ── Recherche ───────────────────────────────────────────────────────── @app.route("/search") def search(): redir = _require_admin() if redir: return redir q = request.args.get("q", "").strip() after_s = request.args.get("after", "").strip() before_s = request.args.get("before", "").strip() in_content = request.args.get("content") == "1" dt_after = _parse_date(after_s) dt_before = _parse_date(before_s) results = [] searched = bool(q or dt_after or dt_before) if searched: q_low = q.lower() for path in sorted(ASSETS_ROOT.rglob("*")): if not path.is_file(): continue parts = path.relative_to(ASSETS_ROOT).parts if any(p in _HIDDEN or p.startswith(".") for p in parts): continue stat = path.stat() mtime = datetime.fromtimestamp(stat.st_mtime) if dt_after and mtime.date() < dt_after.date(): continue if dt_before and mtime.date() > dt_before.date(): continue name_match = (not q) or q_low in path.name.lower() content_match = None if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT: try: text = path.read_text(errors="replace") idx = text.lower().find(q_low) if idx != -1: start = max(0, idx - 60) end = min(len(text), idx + len(q) + 60) snippet = text[start:end].replace("\n", " ") content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "") name_match = True except Exception: pass if not name_match: continue e = _entry(path) e["content_match"] = content_match results.append(e) return render_template("search.html", q=q, after=after_s, before=before_s, in_content=in_content, results=results, searched=searched, ) # ── Statistiques GoAccess ───────────────────────────────────────────── @app.route("/stats/") def stats(): redir = _require_admin() if redir: return redir return render_template("stats.html", has_report=STATS_FILE.is_file(), ) @app.route("/stats/report") def stats_report(): redir = _require_admin() if redir: return redir if not STATS_FILE.is_file(): abort(404) return send_from_directory(STATS_FILE.parent, STATS_FILE.name) def _do_generate(): try: if STATS_GENERATE_CMD: subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600) else: cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED", f"--output={STATS_FILE}"] if STATS_JSON: cmd.append(f"--output={STATS_JSON}") subprocess.run(cmd, timeout=600, check=False) finally: with _gen_lock: _gen_state["generating"] = False @app.route("/stats/generate", methods=["POST"]) def stats_generate(): redir = _require_admin() if redir: return redir if STATS_FILE.is_file(): return jsonify({"status": "exists"}) with _gen_lock: if _gen_state["generating"]: return jsonify({"status": "generating"}) if not STATS_LOG_FILE and not STATS_GENERATE_CMD: return jsonify({"status": "error", "message": "STATS_LOG_FILE non configuré"}), 500 _gen_state["generating"] = True threading.Thread(target=_do_generate, daemon=True).start() return jsonify({"status": "started"}) @app.route("/stats/status") def stats_status(): redir = _require_admin() if redir: return redir return jsonify({"ready": STATS_FILE.is_file(), "generating": _gen_state["generating"]}) # ── Fichiers bruts (aperçu / téléchargement) ────────────────────────── @app.route("/raw/") def raw_file(subpath): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.is_file(): abort(404) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) return send_from_directory(ASSETS_ROOT, subpath) # ── Suppression de fichiers ─────────────────────────────────────────── @app.route("/delete", methods=["POST"]) def delete_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() if not subpath: abort(400) target = _safe_path(subpath) if not target.exists(): abort(404) if target.is_dir(): abort(400) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) parent = str(Path(subpath).parent) _trash_move(target, subpath) return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse")) # ── Corbeille ──────────────────────────────────────────────────────── @app.route("/trash") def trash_list(): redir = _require_admin() if redir: return redir _trash_purge(30) entries = _trash_list() return render_template("trash.html", entries=entries) @app.route("/trash/raw/") def trash_raw(subpath): redir = _require_admin() if redir: return redir target = (TRASH_ROOT / subpath).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if not target.is_file(): abort(404) return send_from_directory(TRASH_ROOT, subpath) @app.route("/trash/delete", methods=["POST"]) def trash_delete(): redir = _require_admin() if redir: return redir trash_rel = request.form.get("path", "").strip() if not trash_rel: abort(400) target = (TRASH_ROOT / trash_rel).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if target.is_file(): target.unlink() info = Path(str(target) + ".trashinfo") if info.exists(): info.unlink() return redirect(url_for("trash_list")) @app.route("/trash/restore", methods=["POST"]) def trash_restore(): redir = _require_admin() if redir: return redir trash_rel = request.form.get("path", "").strip() conflict = request.form.get("conflict", "").strip() if not trash_rel: abort(400) ok, result = _trash_restore(trash_rel, conflict) if not ok: if result == "conflict": return jsonify({"conflict": True, "path": trash_rel}), 409 abort(400) parent = str(Path(result).parent) return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse")) @app.route("/trash/preview/") def trash_preview(subpath): redir = _require_admin() if redir: return redir target = (TRASH_ROOT / subpath).resolve() if not str(target).startswith(str(TRASH_ROOT)): abort(400) if not target.is_file(): abort(404) stat = target.stat() ext = target.suffix.lower() ctx = dict( subpath = subpath, filename = target.name, filesize = _humansize(stat.st_size), mtime = datetime.fromtimestamp(stat.st_mtime), raw_url = url_for("trash_raw", subpath=subpath), parent_path= None, prev_path = None, next_path = None, sibling_pos= "", ext = ext, from_trash = True, ) if ext in IMAGE_EXT: ctx["meta"] = _image_meta(target) return render_template("preview_image.html", **ctx) if ext in TEXT_EXT: return render_template("preview_text.html", content=target.read_text(errors="replace"), lang=ext.lstrip("."), **ctx) return render_template("preview_other.html", **ctx) @app.route("/trash/empty", methods=["POST"]) def trash_empty(): redir = _require_admin() if redir: return redir if TRASH_ROOT.exists(): shutil.rmtree(TRASH_ROOT) TRASH_ROOT.mkdir(exist_ok=True) return redirect(url_for("trash_list")) # ── Renommage de fichiers ───────────────────────────────────────────── @app.route("/rename", methods=["POST"]) def rename_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() new_name = request.form.get("new_name", "").strip() if not subpath or not new_name: return jsonify({"error": "Paramètres manquants"}), 400 if "/" in new_name or "\\" in new_name or new_name in (".", ".."): return jsonify({"error": "Nom invalide"}), 400 new_name = secure_filename(new_name) if not new_name: return jsonify({"error": "Nom invalide après nettoyage"}), 400 target = _safe_path(subpath) if not target.is_file(): return jsonify({"error": "Fichier introuvable"}), 404 top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): return jsonify({"error": "Accès refusé"}), 403 dest = target.parent / new_name if not dest.is_relative_to(ASSETS_ROOT): return jsonify({"error": "Destination invalide"}), 400 if dest.exists(): return jsonify({"error": f"« {new_name} » existe déjà"}), 409 target.rename(dest) new_path = str(dest.relative_to(ASSETS_ROOT)) return jsonify({ "name": new_name, "path": new_path, "browse_url": url_for("browse", subpath=new_path), }) # ── Vérification des conflits avant upload ─────────────────────────── @app.route("/check-upload", methods=["POST"]) def check_upload(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() names = request.form.getlist("names") if subpath: parts = Path(subpath).parts if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")): abort(403) dest = _safe_path(subpath) if subpath else ASSETS_ROOT if not dest.is_dir(): abort(400) conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()] return jsonify({"conflicts": conflicts}) # ── Upload de fichiers ──────────────────────────────────────────────── @app.route("/upload", methods=["POST"]) def upload_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() files = request.files.getlist("files") if not files or all(f.filename == "" for f in files): abort(400) if subpath: parts = Path(subpath).parts if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")): abort(403) dest = _safe_path(subpath) if subpath else ASSETS_ROOT if not dest.is_dir(): abort(400) conflict = request.form.get("conflict", "overwrite") if conflict not in ("backup", "overwrite", "rename", "skip"): conflict = "overwrite" for f in files: name = secure_filename(f.filename or "") if not name: continue out_path = dest / name if out_path.exists(): if conflict == "skip": continue elif conflict == "backup": try: out_path.rename(_backup_path(out_path)) except Exception: continue elif conflict == "rename": out_path = _auto_rename(out_path) f.save(out_path) return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse")) # ── Vérification des conflits avant redimensionnement ──────────────── @app.route("/check-resize", methods=["POST"]) def check_resize(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() raw_sizes = request.form.getlist("sizes") raw_formats = request.form.getlist("formats") if not subpath: return jsonify({"conflicts": []}) target = _safe_path(subpath) if not target.is_file(): return jsonify({"conflicts": []}) try: sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES] except (ValueError, TypeError): sizes = [] formats = [f for f in raw_formats if f in RESIZE_FORMATS] src_ext = target.suffix.lstrip(".") stem = re.sub(r"_\d+x\d+$", "", target.stem) parent = target.parent all_dims = [(s, s) for s in sizes] for cs in request.form.getlist("custom_sizes"): try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w >= 1 and h >= 1: all_dims.append((w, h)) except (ValueError, TypeError): pass # Pas de dimensions sélectionnées → conserver celles d'origine if not all_dims: try: src = Image.open(target) all_dims = [(src.width, src.height)] src.close() except Exception: return jsonify({"conflicts": []}) # Pas de format sélectionné → conserver le format d'origine if not formats: formats = [src_ext] conflicts = [] for fmt in formats: for w, h in all_dims: out_name = f"{stem}_{w}x{h}.{fmt}" if (parent / out_name).exists(): conflicts.append(out_name) return jsonify({"conflicts": conflicts}) # ── Redimensionnement d'images ──────────────────────────────────────── @app.route("/resize", methods=["POST"]) def resize_image(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() raw_sizes = request.form.getlist("sizes") raw_formats = request.form.getlist("formats") raw_customs = request.form.getlist("custom_sizes") if not subpath or not raw_formats: return jsonify({"error": "Paramètres manquants"}), 400 target = _safe_path(subpath) if not target.is_file(): abort(404) ext = target.suffix.lower() if ext not in IMAGE_EXT: abort(400) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) try: square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})] except (ValueError, TypeError): square_dims = [] formats = [f for f in raw_formats if f in RESIZE_FORMATS] # Dimensions libres — validées contre la résolution source src_img = Image.open(target) max_w, max_h = src_img.width, src_img.height src_img.close() custom_dims = [] for cs in raw_customs: try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w < 1 or h < 1: continue if w > max_w or h > max_h: continue custom_dims.append((w, h)) except (ValueError, TypeError): pass all_dims = square_dims + custom_dims # Pas de dimensions sélectionnées → conserver celles d'origine if not all_dims: all_dims = [(max_w, max_h)] # Pas de format sélectionné → conserver le format d'origine if not formats: formats = [ext.lstrip(".")] conflict = request.form.get("conflict", "skip") if conflict not in ("backup", "overwrite", "rename", "skip"): conflict = "skip" is_svg = ext == ".svg" stem = re.sub(r"_\d+x\d+$", "", target.stem) parent = target.parent created, errors = [], [] # Signaler les dimensions hors-bornes for cs in raw_customs: try: w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower())) if w > max_w or h > max_h: for f in formats: errors.append({"name": f"{stem}_{w}x{h}.{f}", "reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"}) except (ValueError, TypeError): pass for fmt in formats: for w, h in all_dims: out_name = f"{stem}_{w}x{h}.{fmt}" out_path = parent / out_name out_rel = str(out_path.relative_to(ASSETS_ROOT)) if out_path.exists(): if conflict == "skip": errors.append({"name": out_name, "reason": "Fichier existant, ignoré"}) continue elif conflict == "backup": try: bak = _backup_path(out_path) out_path.rename(bak) except Exception as exc: errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"}) continue elif conflict == "rename": out_path = _auto_rename(out_path) out_name = out_path.name out_rel = str(out_path.relative_to(ASSETS_ROOT)) try: if fmt == "svg" and is_svg: shutil.copy2(target, out_path) created.append({"name": out_name, "path": out_rel}) elif fmt == "svg" and not is_svg: errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"}) elif is_svg: if not HAS_CAIROSVG: errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"}) continue if fmt in ("png", "ico"): buf = io.BytesIO() _cairosvg.svg2png(url=str(target), write_to=buf, output_width=w, output_height=h) buf.seek(0) img = Image.open(buf).convert("RGBA") if fmt == "ico": img.save(out_path, format="ICO", sizes=[(w, h)]) else: img.save(out_path, format="PNG") else: buf = io.BytesIO() _cairosvg.svg2png(url=str(target), write_to=buf, output_width=w, output_height=h) buf.seek(0) img = Image.open(buf).convert("RGB") img.save(out_path, format="JPEG", quality=90) created.append({"name": out_name, "path": out_rel}) else: img = Image.open(target) # LANCZOS ne supporte pas le mode palette (ICO, GIF…) if img.mode not in ("RGB", "RGBA", "L", "LA", "I", "F"): img = img.convert("RGBA") img = img.resize((w, h), Image.LANCZOS) if fmt == "png": if img.mode not in ("RGBA", "RGB", "L"): img = img.convert("RGBA") img.save(out_path, format="PNG") elif fmt == "jpg": img = img.convert("RGB") img.save(out_path, format="JPEG", quality=90) elif fmt == "ico": img = img.convert("RGBA") img.save(out_path, format="ICO", sizes=[(w, h)]) created.append({"name": out_name, "path": out_rel}) except Exception as exc: errors.append({"name": out_name, "reason": str(exc)}) return jsonify({"created": created, "errors": errors}) # ── Fichiers CDN publics (dev local uniquement) ─────────────────────── _PUBLIC_TOP = frozenset({"logo", "wiki", "error"}) @app.route("/favicon.ico") def favicon(): return send_from_directory(ASSETS_ROOT, "favicon.ico") @app.route("/robots.txt") def robots(): return send_from_directory(ASSETS_ROOT, "robots.txt") @app.route("/") def cdn_file(subpath): top = Path(subpath).parts[0] if top not in _PUBLIC_TOP: abort(404) target = _safe_path(subpath) if not target.is_file(): abort(404) return send_from_directory(ASSETS_ROOT, subpath) if __name__ == "__main__": app.run(debug=True, port=5003)