Ajout d'un champ W×H dans la carte resize, contraint à la résolution source. Option "carré" synchronise les deux valeurs. Le bouton Générer s'active si au moins un format est sélectionné et une taille valide (prédéfinie ou libre) est renseignée. Supprime le code mort dans la route /resize (errors_pre). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
891 lines
29 KiB
Python
891 lines
29 KiB
Python
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
from urllib.parse import urlencode
|
||
from PIL import Image
|
||
|
||
from flask import (Flask, redirect, url_for, session, request,
|
||
render_template, abort, send_from_directory, jsonify)
|
||
from authlib.integrations.flask_client import OAuth
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
from werkzeug.utils import secure_filename
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.environ["SECRET_KEY"]
|
||
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
||
|
||
oauth = OAuth(app)
|
||
oauth.register(
|
||
name="alpid",
|
||
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
|
||
client_id=os.environ["ALPID_CLIENT_ID"],
|
||
client_secret=os.environ["ALPID_CLIENT_SECRET"],
|
||
client_kwargs={"scope": "openid profile email"},
|
||
)
|
||
|
||
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
|
||
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
|
||
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
|
||
|
||
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
|
||
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
|
||
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
|
||
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
|
||
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
|
||
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
|
||
|
||
_gen_lock = threading.Lock()
|
||
_gen_state = {"generating": False}
|
||
|
||
|
||
_HIDDEN = frozenset({
|
||
".git", "scripts", "app",
|
||
".env", ".env.example", ".gitignore",
|
||
".htaccess", ".htpasswd_stats",
|
||
"standard_index.html",
|
||
})
|
||
|
||
try:
|
||
import cairosvg as _cairosvg
|
||
HAS_CAIROSVG = True
|
||
except ImportError:
|
||
HAS_CAIROSVG = False
|
||
|
||
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
|
||
|
||
RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024})
|
||
RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"})
|
||
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
|
||
".xml", ".conf", ".sh", ".robots", ".php"})
|
||
PDF_EXT = frozenset({".pdf"})
|
||
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────
|
||
|
||
def _user():
|
||
return session.get("user")
|
||
|
||
|
||
def _require_admin():
|
||
u = _user()
|
||
if not u:
|
||
session["next_url"] = request.url
|
||
return redirect(url_for("login"))
|
||
if not u.get("is_admin"):
|
||
abort(403)
|
||
return None
|
||
|
||
|
||
def _safe_path(subpath: str) -> Path:
|
||
target = (ASSETS_ROOT / subpath).resolve()
|
||
if not target.is_relative_to(ASSETS_ROOT):
|
||
abort(400)
|
||
return target
|
||
|
||
|
||
def _humansize(n: int) -> str:
|
||
for unit in ("o", "Ko", "Mo", "Go"):
|
||
if n < 1024:
|
||
return f"{n:.0f} {unit}"
|
||
n /= 1024
|
||
return f"{n:.1f} To"
|
||
|
||
|
||
_EXIF_WANTED = frozenset({
|
||
"Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized",
|
||
"ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash",
|
||
"WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation",
|
||
"XResolution", "YResolution", "ResolutionUnit", "ColorSpace",
|
||
"PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo",
|
||
})
|
||
|
||
|
||
def _image_meta(path: Path) -> dict:
|
||
try:
|
||
from PIL import ExifTags
|
||
img = Image.open(path)
|
||
meta = {
|
||
"width": img.width,
|
||
"height": img.height,
|
||
"format": img.format or path.suffix.lstrip(".").upper(),
|
||
"mode": img.mode,
|
||
"dpi": img.info.get("dpi"),
|
||
}
|
||
exif = img.getexif()
|
||
if exif:
|
||
rows = {}
|
||
for tag_id, val in exif.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
sub = exif.get_ifd(0x8769)
|
||
for tag_id, val in sub.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
gps_ifd = exif.get_ifd(0x8825)
|
||
if gps_ifd:
|
||
rows["GPS"] = _parse_gps(gps_ifd)
|
||
if rows:
|
||
meta["exif"] = rows
|
||
return meta
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _parse_gps(gps_ifd: dict):
|
||
try:
|
||
from PIL import ExifTags
|
||
def _dms(coords):
|
||
d, m, s = coords
|
||
return float(d) + float(m) / 60 + float(s) / 3600
|
||
|
||
lat = _dms(gps_ifd.get(2, (0, 0, 0)))
|
||
lon = _dms(gps_ifd.get(4, (0, 0, 0)))
|
||
latR = gps_ifd.get(1, "N")
|
||
lonR = gps_ifd.get(3, "E")
|
||
if latR == "S": lat = -lat
|
||
if lonR == "W": lon = -lon
|
||
return f"{lat:.6f}, {lon:.6f}"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _backup_path(p: Path) -> Path:
|
||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
return p.parent / f"{p.stem}_bak_{ts}{p.suffix}"
|
||
|
||
|
||
def _auto_rename(p: Path) -> Path:
|
||
i = 1
|
||
while True:
|
||
candidate = p.parent / f"{p.stem}_{i}{p.suffix}"
|
||
if not candidate.exists():
|
||
return candidate
|
||
i += 1
|
||
|
||
|
||
def _folder_stats(path: Path) -> dict:
|
||
files, size = 0, 0
|
||
for f in path.rglob("*"):
|
||
if not f.is_file():
|
||
continue
|
||
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
|
||
continue
|
||
files += 1
|
||
size += f.stat().st_size
|
||
return {"files": files, "size": size}
|
||
|
||
|
||
def _parse_date(s: str):
|
||
if not s:
|
||
return None
|
||
try:
|
||
return datetime.strptime(s, "%Y-%m-%d")
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _load_hits() -> dict:
|
||
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
|
||
if not STATS_JSON.is_file():
|
||
return {}
|
||
try:
|
||
data = json.loads(STATS_JSON.read_text())
|
||
hits = {}
|
||
for item in data.get("requests", {}).get("data", []):
|
||
url = item.get("data", "").lstrip("/")
|
||
count = item.get("hits", {}).get("count", 0)
|
||
if url and count:
|
||
hits[url] = count
|
||
return hits
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _entry(item: Path) -> dict:
|
||
stat = item.stat()
|
||
ext = item.suffix.lower()
|
||
return {
|
||
"name": item.name,
|
||
"path": str(item.relative_to(ASSETS_ROOT)),
|
||
"is_dir": item.is_dir(),
|
||
"size": stat.st_size if item.is_file() else None,
|
||
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
||
"ext": ext,
|
||
"is_image": ext in IMAGE_EXT,
|
||
"is_text": ext in TEXT_EXT,
|
||
"is_pdf": ext in PDF_EXT,
|
||
}
|
||
|
||
|
||
# ── Auth ──────────────────────────────────────────────────────────────
|
||
|
||
@app.route("/auth/login")
|
||
def login():
|
||
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
|
||
|
||
|
||
@app.route("/auth/callback")
|
||
def callback():
|
||
token = oauth.alpid.authorize_access_token()
|
||
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
|
||
email = info.get("email", "")
|
||
groups = set(info.get("groups", []))
|
||
if groups:
|
||
is_admin = bool(groups & ADMIN_GROUPS)
|
||
elif ADMIN_EMAILS:
|
||
is_admin = email in ADMIN_EMAILS
|
||
else:
|
||
is_admin = True
|
||
session["user"] = {
|
||
"sub": info["sub"],
|
||
"name": info.get("name") or info.get("preferred_username", ""),
|
||
"email": email,
|
||
"is_admin": is_admin,
|
||
}
|
||
session["id_token"] = token.get("id_token", "")
|
||
return redirect(session.pop("next_url", url_for("dashboard")))
|
||
|
||
|
||
@app.route("/auth/logout")
|
||
def logout():
|
||
id_token = session.get("id_token")
|
||
session.clear()
|
||
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
|
||
if id_token:
|
||
params["id_token_hint"] = id_token
|
||
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
|
||
|
||
|
||
# ── Dashboard ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/")
|
||
def dashboard():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
folders = {}
|
||
for item in sorted(ASSETS_ROOT.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
if item.is_dir():
|
||
folders[item.name] = _folder_stats(item)
|
||
|
||
return render_template("dashboard.html",
|
||
user=_user(),
|
||
folders=folders,
|
||
total_files=sum(v["files"] for v in folders.values()),
|
||
total_size=sum(v["size"] for v in folders.values()),
|
||
humansize=_humansize,
|
||
)
|
||
|
||
|
||
# ── Navigateur de fichiers ────────────────────────────────────────────
|
||
|
||
@app.route("/browse/")
|
||
@app.route("/browse/<path:subpath>")
|
||
def browse(subpath=""):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
|
||
if subpath:
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
if target.is_file():
|
||
return _file_preview(target, subpath)
|
||
|
||
hits = _load_hits()
|
||
|
||
entries = []
|
||
for item in sorted(target.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
e = _entry(item)
|
||
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
|
||
entries.append(e)
|
||
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
|
||
|
||
parts = Path(subpath).parts if subpath else ()
|
||
breadcrumb = [
|
||
{"name": p, "path": str(Path(*parts[:i + 1]))}
|
||
for i, p in enumerate(parts)
|
||
]
|
||
|
||
return render_template("browse.html",
|
||
user=_user(),
|
||
entries=entries,
|
||
subpath=subpath,
|
||
breadcrumb=breadcrumb,
|
||
humansize=_humansize,
|
||
has_hits=STATS_JSON.is_file(),
|
||
)
|
||
|
||
|
||
def _file_preview(path: Path, subpath: str):
|
||
ext = path.suffix.lower()
|
||
stat = path.stat()
|
||
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
|
||
|
||
siblings = sorted(
|
||
[f for f in path.parent.iterdir()
|
||
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
|
||
key=lambda f: f.name.lower(),
|
||
)
|
||
idx = next((i for i, f in enumerate(siblings) if f == path), None)
|
||
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
|
||
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
|
||
|
||
ctx = dict(
|
||
user=_user(),
|
||
subpath=subpath,
|
||
filename=path.name,
|
||
filesize=_humansize(stat.st_size),
|
||
mtime=datetime.fromtimestamp(stat.st_mtime),
|
||
raw_url=url_for("raw_file", subpath=subpath),
|
||
parent_path=parent,
|
||
prev_path=prev_path,
|
||
next_path=next_path,
|
||
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
|
||
ext=ext,
|
||
)
|
||
if ext in IMAGE_EXT:
|
||
ctx["meta"] = _image_meta(path)
|
||
return render_template("preview_image.html", **ctx)
|
||
if ext in TEXT_EXT:
|
||
content = path.read_text(errors="replace")
|
||
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
|
||
return render_template("preview_other.html", **ctx)
|
||
|
||
|
||
# ── Recherche ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/search")
|
||
def search():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
q = request.args.get("q", "").strip()
|
||
after_s = request.args.get("after", "").strip()
|
||
before_s = request.args.get("before", "").strip()
|
||
in_content = request.args.get("content") == "1"
|
||
|
||
dt_after = _parse_date(after_s)
|
||
dt_before = _parse_date(before_s)
|
||
|
||
results = []
|
||
searched = bool(q or dt_after or dt_before)
|
||
|
||
if searched:
|
||
q_low = q.lower()
|
||
for path in sorted(ASSETS_ROOT.rglob("*")):
|
||
if not path.is_file():
|
||
continue
|
||
parts = path.relative_to(ASSETS_ROOT).parts
|
||
if any(p in _HIDDEN or p.startswith(".") for p in parts):
|
||
continue
|
||
|
||
stat = path.stat()
|
||
mtime = datetime.fromtimestamp(stat.st_mtime)
|
||
|
||
if dt_after and mtime.date() < dt_after.date():
|
||
continue
|
||
if dt_before and mtime.date() > dt_before.date():
|
||
continue
|
||
|
||
name_match = (not q) or q_low in path.name.lower()
|
||
content_match = None
|
||
|
||
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
|
||
try:
|
||
text = path.read_text(errors="replace")
|
||
idx = text.lower().find(q_low)
|
||
if idx != -1:
|
||
start = max(0, idx - 60)
|
||
end = min(len(text), idx + len(q) + 60)
|
||
snippet = text[start:end].replace("\n", " ")
|
||
content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "")
|
||
name_match = True
|
||
except Exception:
|
||
pass
|
||
|
||
if not name_match:
|
||
continue
|
||
|
||
e = _entry(path)
|
||
e["content_match"] = content_match
|
||
results.append(e)
|
||
|
||
return render_template("search.html",
|
||
user=_user(),
|
||
q=q,
|
||
after=after_s,
|
||
before=before_s,
|
||
in_content=in_content,
|
||
results=results,
|
||
searched=searched,
|
||
humansize=_humansize,
|
||
)
|
||
|
||
|
||
# ── Statistiques GoAccess ─────────────────────────────────────────────
|
||
|
||
@app.route("/stats/")
|
||
def stats():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return render_template("stats.html",
|
||
user=_user(),
|
||
has_report=STATS_FILE.is_file(),
|
||
)
|
||
|
||
|
||
@app.route("/stats/report")
|
||
def stats_report():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if not STATS_FILE.is_file():
|
||
abort(404)
|
||
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
|
||
|
||
|
||
def _do_generate():
|
||
try:
|
||
if STATS_GENERATE_CMD:
|
||
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
|
||
else:
|
||
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
|
||
f"--output={STATS_FILE}"]
|
||
if STATS_JSON:
|
||
cmd.append(f"--output={STATS_JSON}")
|
||
subprocess.run(cmd, timeout=600, check=False)
|
||
finally:
|
||
with _gen_lock:
|
||
_gen_state["generating"] = False
|
||
|
||
|
||
@app.route("/stats/generate", methods=["POST"])
|
||
def stats_generate():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if STATS_FILE.is_file():
|
||
return jsonify({"status": "exists"})
|
||
with _gen_lock:
|
||
if _gen_state["generating"]:
|
||
return jsonify({"status": "generating"})
|
||
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
|
||
return jsonify({"status": "error",
|
||
"message": "STATS_LOG_FILE non configuré"}), 500
|
||
_gen_state["generating"] = True
|
||
threading.Thread(target=_do_generate, daemon=True).start()
|
||
return jsonify({"status": "started"})
|
||
|
||
|
||
@app.route("/stats/status")
|
||
def stats_status():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return jsonify({"ready": STATS_FILE.is_file(),
|
||
"generating": _gen_state["generating"]})
|
||
|
||
|
||
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
|
||
|
||
@app.route("/raw/<path:subpath>")
|
||
def raw_file(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
# ── Suppression de fichiers ───────────────────────────────────────────
|
||
|
||
@app.route("/delete", methods=["POST"])
|
||
def delete_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
subpath = request.form.get("path", "").strip()
|
||
if not subpath:
|
||
abort(400)
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
if target.is_dir():
|
||
abort(400)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
parent = str(Path(subpath).parent)
|
||
target.unlink()
|
||
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
||
|
||
|
||
# ── Renommage de fichiers ─────────────────────────────────────────────
|
||
|
||
@app.route("/rename", methods=["POST"])
|
||
def rename_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
new_name = request.form.get("new_name", "").strip()
|
||
|
||
if not subpath or not new_name:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
if "/" in new_name or "\\" in new_name or new_name in (".", ".."):
|
||
return jsonify({"error": "Nom invalide"}), 400
|
||
|
||
new_name = secure_filename(new_name)
|
||
if not new_name:
|
||
return jsonify({"error": "Nom invalide après nettoyage"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"error": "Fichier introuvable"}), 404
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
return jsonify({"error": "Accès refusé"}), 403
|
||
|
||
dest = target.parent / new_name
|
||
if not dest.is_relative_to(ASSETS_ROOT):
|
||
return jsonify({"error": "Destination invalide"}), 400
|
||
|
||
if dest.exists():
|
||
return jsonify({"error": f"« {new_name} » existe déjà"}), 409
|
||
|
||
target.rename(dest)
|
||
new_path = str(dest.relative_to(ASSETS_ROOT))
|
||
return jsonify({
|
||
"name": new_name,
|
||
"path": new_path,
|
||
"browse_url": url_for("browse", subpath=new_path),
|
||
})
|
||
|
||
|
||
# ── Vérification des conflits avant upload ───────────────────────────
|
||
|
||
@app.route("/check-upload", methods=["POST"])
|
||
def check_upload():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
names = request.form.getlist("names")
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()]
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Upload de fichiers ────────────────────────────────────────────────
|
||
|
||
@app.route("/upload", methods=["POST"])
|
||
def upload_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
files = request.files.getlist("files")
|
||
|
||
if not files or all(f.filename == "" for f in files):
|
||
abort(400)
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflict = request.form.get("conflict", "overwrite")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "overwrite"
|
||
|
||
for f in files:
|
||
name = secure_filename(f.filename or "")
|
||
if not name:
|
||
continue
|
||
out_path = dest / name
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
out_path.rename(_backup_path(out_path))
|
||
except Exception:
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
f.save(out_path)
|
||
|
||
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
|
||
|
||
|
||
# ── Vérification des conflits avant redimensionnement ────────────────
|
||
|
||
@app.route("/check-resize", methods=["POST"])
|
||
def check_resize():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
|
||
if not subpath:
|
||
return jsonify({"conflicts": []})
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"conflicts": []})
|
||
|
||
try:
|
||
sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES]
|
||
except (ValueError, TypeError):
|
||
return jsonify({"conflicts": []})
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
|
||
all_dims = [(s, s) for s in sizes]
|
||
for cs in request.form.getlist("custom_sizes"):
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w >= 1 and h >= 1:
|
||
all_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
conflicts = []
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
if (parent / out_name).exists():
|
||
conflicts.append(out_name)
|
||
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Redimensionnement d'images ────────────────────────────────────────
|
||
|
||
@app.route("/resize", methods=["POST"])
|
||
def resize_image():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
raw_customs = request.form.getlist("custom_sizes")
|
||
|
||
if not subpath or not raw_formats:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
|
||
ext = target.suffix.lower()
|
||
if ext not in IMAGE_EXT:
|
||
abort(400)
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
try:
|
||
square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})]
|
||
except (ValueError, TypeError):
|
||
square_dims = []
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
|
||
# Dimensions libres — validées contre la résolution source
|
||
src_img = Image.open(target)
|
||
max_w, max_h = src_img.width, src_img.height
|
||
src_img.close()
|
||
|
||
custom_dims = []
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w < 1 or h < 1:
|
||
continue
|
||
if w > max_w or h > max_h:
|
||
continue
|
||
custom_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
all_dims = square_dims + custom_dims
|
||
if not all_dims or not formats:
|
||
return jsonify({"error": "Aucune dimension ou format valide"}), 400
|
||
|
||
conflict = request.form.get("conflict", "skip")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "skip"
|
||
|
||
is_svg = ext == ".svg"
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
created, errors = [], []
|
||
|
||
# Signaler les dimensions hors-bornes
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w > max_w or h > max_h:
|
||
for f in formats:
|
||
errors.append({"name": f"{stem}_{w}x{h}.{f}",
|
||
"reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"})
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
out_path = parent / out_name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
errors.append({"name": out_name, "reason": "Fichier existant, ignoré"})
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
bak = _backup_path(out_path)
|
||
out_path.rename(bak)
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"})
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
out_name = out_path.name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
try:
|
||
if fmt == "svg" and is_svg:
|
||
shutil.copy2(target, out_path)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
elif fmt == "svg" and not is_svg:
|
||
errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"})
|
||
|
||
elif is_svg:
|
||
if not HAS_CAIROSVG:
|
||
errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"})
|
||
continue
|
||
if fmt in ("png", "ico"):
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGBA")
|
||
if fmt == "ico":
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
else:
|
||
img.save(out_path, format="PNG")
|
||
else:
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
else:
|
||
img = Image.open(target)
|
||
img = img.resize((w, h), Image.LANCZOS)
|
||
if fmt == "png":
|
||
if img.mode not in ("RGBA", "RGB", "L"):
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="PNG")
|
||
elif fmt == "jpg":
|
||
img = img.convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
elif fmt == "ico":
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": str(exc)})
|
||
|
||
return jsonify({"created": created, "errors": errors})
|
||
|
||
|
||
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
|
||
|
||
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
|
||
|
||
|
||
@app.route("/favicon.ico")
|
||
def favicon():
|
||
return send_from_directory(ASSETS_ROOT, "favicon.ico")
|
||
|
||
|
||
@app.route("/robots.txt")
|
||
def robots():
|
||
return send_from_directory(ASSETS_ROOT, "robots.txt")
|
||
|
||
|
||
@app.route("/<path:subpath>")
|
||
def cdn_file(subpath):
|
||
top = Path(subpath).parts[0]
|
||
if top not in _PUBLIC_TOP:
|
||
abort(404)
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(debug=True, port=5003)
|