Ajoute une carte de propriétés au-dessus de l'image : - Dimensions (largeur × hauteur), format, mode couleur, résolution DPI - Données EXIF complètes si présentes (appareil, exposition, ISO, focale, balance des blancs, auteur, copyright…) - Coordonnées GPS avec lien OpenStreetMap si le champ est renseigné Nouvelle fonction _image_meta() et _parse_gps() dans app.py (Pillow). Grille CSS responsive, n'apparaît pas si aucune métadonnée disponible. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
813 lines
26 KiB
Python
813 lines
26 KiB
Python
import io
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from urllib.parse import urlencode
|
|
from PIL import Image
|
|
|
|
from flask import (Flask, redirect, url_for, session, request,
|
|
render_template, abort, send_from_directory, jsonify)
|
|
from authlib.integrations.flask_client import OAuth
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ["SECRET_KEY"]
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
|
|
|
oauth = OAuth(app)
|
|
oauth.register(
|
|
name="alpid",
|
|
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
|
|
client_id=os.environ["ALPID_CLIENT_ID"],
|
|
client_secret=os.environ["ALPID_CLIENT_SECRET"],
|
|
client_kwargs={"scope": "openid profile email"},
|
|
)
|
|
|
|
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
|
|
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
|
|
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
|
|
|
|
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
|
|
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
|
|
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
|
|
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
|
|
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
|
|
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
|
|
|
|
_gen_lock = threading.Lock()
|
|
_gen_state = {"generating": False}
|
|
|
|
|
|
_HIDDEN = frozenset({
|
|
".git", "scripts", "app",
|
|
".env", ".env.example", ".gitignore",
|
|
".htaccess", ".htpasswd_stats",
|
|
"standard_index.html",
|
|
})
|
|
|
|
try:
|
|
import cairosvg as _cairosvg
|
|
HAS_CAIROSVG = True
|
|
except ImportError:
|
|
HAS_CAIROSVG = False
|
|
|
|
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
|
|
|
|
RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024})
|
|
RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"})
|
|
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
|
|
".xml", ".conf", ".sh", ".robots", ".php"})
|
|
PDF_EXT = frozenset({".pdf"})
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────
|
|
|
|
def _user():
|
|
return session.get("user")
|
|
|
|
|
|
def _require_admin():
|
|
u = _user()
|
|
if not u:
|
|
session["next_url"] = request.url
|
|
return redirect(url_for("login"))
|
|
if not u.get("is_admin"):
|
|
abort(403)
|
|
return None
|
|
|
|
|
|
def _safe_path(subpath: str) -> Path:
|
|
target = (ASSETS_ROOT / subpath).resolve()
|
|
if not target.is_relative_to(ASSETS_ROOT):
|
|
abort(400)
|
|
return target
|
|
|
|
|
|
def _humansize(n: int) -> str:
|
|
for unit in ("o", "Ko", "Mo", "Go"):
|
|
if n < 1024:
|
|
return f"{n:.0f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} To"
|
|
|
|
|
|
_EXIF_WANTED = frozenset({
|
|
"Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized",
|
|
"ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash",
|
|
"WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation",
|
|
"XResolution", "YResolution", "ResolutionUnit", "ColorSpace",
|
|
"PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo",
|
|
})
|
|
|
|
|
|
def _image_meta(path: Path) -> dict:
|
|
try:
|
|
from PIL import ExifTags
|
|
img = Image.open(path)
|
|
meta = {
|
|
"width": img.width,
|
|
"height": img.height,
|
|
"format": img.format or path.suffix.lstrip(".").upper(),
|
|
"mode": img.mode,
|
|
"dpi": img.info.get("dpi"),
|
|
}
|
|
exif = img.getexif()
|
|
if exif:
|
|
rows = {}
|
|
for tag_id, val in exif.items():
|
|
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
|
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
|
rows[tag] = str(val)
|
|
sub = exif.get_ifd(0x8769)
|
|
for tag_id, val in sub.items():
|
|
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
|
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
|
rows[tag] = str(val)
|
|
gps_ifd = exif.get_ifd(0x8825)
|
|
if gps_ifd:
|
|
rows["GPS"] = _parse_gps(gps_ifd)
|
|
if rows:
|
|
meta["exif"] = rows
|
|
return meta
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _parse_gps(gps_ifd: dict):
|
|
try:
|
|
from PIL import ExifTags
|
|
def _dms(coords):
|
|
d, m, s = coords
|
|
return float(d) + float(m) / 60 + float(s) / 3600
|
|
|
|
lat = _dms(gps_ifd.get(2, (0, 0, 0)))
|
|
lon = _dms(gps_ifd.get(4, (0, 0, 0)))
|
|
latR = gps_ifd.get(1, "N")
|
|
lonR = gps_ifd.get(3, "E")
|
|
if latR == "S": lat = -lat
|
|
if lonR == "W": lon = -lon
|
|
return f"{lat:.6f}, {lon:.6f}"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _backup_path(p: Path) -> Path:
|
|
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
return p.parent / f"{p.stem}_bak_{ts}{p.suffix}"
|
|
|
|
|
|
def _auto_rename(p: Path) -> Path:
|
|
i = 1
|
|
while True:
|
|
candidate = p.parent / f"{p.stem}_{i}{p.suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
i += 1
|
|
|
|
|
|
def _folder_stats(path: Path) -> dict:
|
|
files, size = 0, 0
|
|
for f in path.rglob("*"):
|
|
if not f.is_file():
|
|
continue
|
|
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
|
|
continue
|
|
files += 1
|
|
size += f.stat().st_size
|
|
return {"files": files, "size": size}
|
|
|
|
|
|
def _parse_date(s: str):
|
|
if not s:
|
|
return None
|
|
try:
|
|
return datetime.strptime(s, "%Y-%m-%d")
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _load_hits() -> dict:
|
|
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
|
|
if not STATS_JSON.is_file():
|
|
return {}
|
|
try:
|
|
data = json.loads(STATS_JSON.read_text())
|
|
hits = {}
|
|
for item in data.get("requests", {}).get("data", []):
|
|
url = item.get("data", "").lstrip("/")
|
|
count = item.get("hits", {}).get("count", 0)
|
|
if url and count:
|
|
hits[url] = count
|
|
return hits
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _entry(item: Path) -> dict:
|
|
stat = item.stat()
|
|
ext = item.suffix.lower()
|
|
return {
|
|
"name": item.name,
|
|
"path": str(item.relative_to(ASSETS_ROOT)),
|
|
"is_dir": item.is_dir(),
|
|
"size": stat.st_size if item.is_file() else None,
|
|
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
|
"ext": ext,
|
|
"is_image": ext in IMAGE_EXT,
|
|
"is_text": ext in TEXT_EXT,
|
|
"is_pdf": ext in PDF_EXT,
|
|
}
|
|
|
|
|
|
# ── Auth ──────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/auth/login")
|
|
def login():
|
|
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
|
|
|
|
|
|
@app.route("/auth/callback")
|
|
def callback():
|
|
token = oauth.alpid.authorize_access_token()
|
|
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
|
|
email = info.get("email", "")
|
|
groups = set(info.get("groups", []))
|
|
if groups:
|
|
is_admin = bool(groups & ADMIN_GROUPS)
|
|
elif ADMIN_EMAILS:
|
|
is_admin = email in ADMIN_EMAILS
|
|
else:
|
|
is_admin = True
|
|
session["user"] = {
|
|
"sub": info["sub"],
|
|
"name": info.get("name") or info.get("preferred_username", ""),
|
|
"email": email,
|
|
"is_admin": is_admin,
|
|
}
|
|
session["id_token"] = token.get("id_token", "")
|
|
return redirect(session.pop("next_url", url_for("dashboard")))
|
|
|
|
|
|
@app.route("/auth/logout")
|
|
def logout():
|
|
id_token = session.get("id_token")
|
|
session.clear()
|
|
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
|
|
if id_token:
|
|
params["id_token_hint"] = id_token
|
|
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
|
|
|
|
|
|
# ── Dashboard ─────────────────────────────────────────────────────────
|
|
|
|
@app.route("/")
|
|
def dashboard():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
folders = {}
|
|
for item in sorted(ASSETS_ROOT.iterdir()):
|
|
if item.name in _HIDDEN or item.name.startswith("."):
|
|
continue
|
|
if item.is_dir():
|
|
folders[item.name] = _folder_stats(item)
|
|
|
|
return render_template("dashboard.html",
|
|
user=_user(),
|
|
folders=folders,
|
|
total_files=sum(v["files"] for v in folders.values()),
|
|
total_size=sum(v["size"] for v in folders.values()),
|
|
humansize=_humansize,
|
|
)
|
|
|
|
|
|
# ── Navigateur de fichiers ────────────────────────────────────────────
|
|
|
|
@app.route("/browse/")
|
|
@app.route("/browse/<path:subpath>")
|
|
def browse(subpath=""):
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
target = _safe_path(subpath)
|
|
if not target.exists():
|
|
abort(404)
|
|
|
|
if subpath:
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
|
|
if target.is_file():
|
|
return _file_preview(target, subpath)
|
|
|
|
hits = _load_hits()
|
|
|
|
entries = []
|
|
for item in sorted(target.iterdir()):
|
|
if item.name in _HIDDEN or item.name.startswith("."):
|
|
continue
|
|
e = _entry(item)
|
|
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
|
|
entries.append(e)
|
|
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
|
|
|
|
parts = Path(subpath).parts if subpath else ()
|
|
breadcrumb = [
|
|
{"name": p, "path": str(Path(*parts[:i + 1]))}
|
|
for i, p in enumerate(parts)
|
|
]
|
|
|
|
return render_template("browse.html",
|
|
user=_user(),
|
|
entries=entries,
|
|
subpath=subpath,
|
|
breadcrumb=breadcrumb,
|
|
humansize=_humansize,
|
|
has_hits=STATS_JSON.is_file(),
|
|
)
|
|
|
|
|
|
def _file_preview(path: Path, subpath: str):
|
|
ext = path.suffix.lower()
|
|
stat = path.stat()
|
|
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
|
|
|
|
siblings = sorted(
|
|
[f for f in path.parent.iterdir()
|
|
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
|
|
key=lambda f: f.name.lower(),
|
|
)
|
|
idx = next((i for i, f in enumerate(siblings) if f == path), None)
|
|
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
|
|
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
|
|
|
|
ctx = dict(
|
|
user=_user(),
|
|
subpath=subpath,
|
|
filename=path.name,
|
|
filesize=_humansize(stat.st_size),
|
|
mtime=datetime.fromtimestamp(stat.st_mtime),
|
|
raw_url=url_for("raw_file", subpath=subpath),
|
|
parent_path=parent,
|
|
prev_path=prev_path,
|
|
next_path=next_path,
|
|
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
|
|
ext=ext,
|
|
)
|
|
if ext in IMAGE_EXT:
|
|
ctx["meta"] = _image_meta(path)
|
|
return render_template("preview_image.html", **ctx)
|
|
if ext in TEXT_EXT:
|
|
content = path.read_text(errors="replace")
|
|
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
|
|
return render_template("preview_other.html", **ctx)
|
|
|
|
|
|
# ── Recherche ─────────────────────────────────────────────────────────
|
|
|
|
@app.route("/search")
|
|
def search():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
q = request.args.get("q", "").strip()
|
|
after_s = request.args.get("after", "").strip()
|
|
before_s = request.args.get("before", "").strip()
|
|
in_content = request.args.get("content") == "1"
|
|
|
|
dt_after = _parse_date(after_s)
|
|
dt_before = _parse_date(before_s)
|
|
|
|
results = []
|
|
searched = bool(q or dt_after or dt_before)
|
|
|
|
if searched:
|
|
q_low = q.lower()
|
|
for path in sorted(ASSETS_ROOT.rglob("*")):
|
|
if not path.is_file():
|
|
continue
|
|
parts = path.relative_to(ASSETS_ROOT).parts
|
|
if any(p in _HIDDEN or p.startswith(".") for p in parts):
|
|
continue
|
|
|
|
stat = path.stat()
|
|
mtime = datetime.fromtimestamp(stat.st_mtime)
|
|
|
|
if dt_after and mtime.date() < dt_after.date():
|
|
continue
|
|
if dt_before and mtime.date() > dt_before.date():
|
|
continue
|
|
|
|
name_match = (not q) or q_low in path.name.lower()
|
|
content_match = None
|
|
|
|
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
|
|
try:
|
|
text = path.read_text(errors="replace")
|
|
idx = text.lower().find(q_low)
|
|
if idx != -1:
|
|
start = max(0, idx - 60)
|
|
end = min(len(text), idx + len(q) + 60)
|
|
snippet = text[start:end].replace("\n", " ")
|
|
content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "")
|
|
name_match = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not name_match:
|
|
continue
|
|
|
|
e = _entry(path)
|
|
e["content_match"] = content_match
|
|
results.append(e)
|
|
|
|
return render_template("search.html",
|
|
user=_user(),
|
|
q=q,
|
|
after=after_s,
|
|
before=before_s,
|
|
in_content=in_content,
|
|
results=results,
|
|
searched=searched,
|
|
humansize=_humansize,
|
|
)
|
|
|
|
|
|
# ── Statistiques GoAccess ─────────────────────────────────────────────
|
|
|
|
@app.route("/stats/")
|
|
def stats():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
return render_template("stats.html",
|
|
user=_user(),
|
|
has_report=STATS_FILE.is_file(),
|
|
)
|
|
|
|
|
|
@app.route("/stats/report")
|
|
def stats_report():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
if not STATS_FILE.is_file():
|
|
abort(404)
|
|
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
|
|
|
|
|
|
def _do_generate():
|
|
try:
|
|
if STATS_GENERATE_CMD:
|
|
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
|
|
else:
|
|
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
|
|
f"--output={STATS_FILE}"]
|
|
if STATS_JSON:
|
|
cmd.append(f"--output={STATS_JSON}")
|
|
subprocess.run(cmd, timeout=600, check=False)
|
|
finally:
|
|
with _gen_lock:
|
|
_gen_state["generating"] = False
|
|
|
|
|
|
@app.route("/stats/generate", methods=["POST"])
|
|
def stats_generate():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
if STATS_FILE.is_file():
|
|
return jsonify({"status": "exists"})
|
|
with _gen_lock:
|
|
if _gen_state["generating"]:
|
|
return jsonify({"status": "generating"})
|
|
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
|
|
return jsonify({"status": "error",
|
|
"message": "STATS_LOG_FILE non configuré"}), 500
|
|
_gen_state["generating"] = True
|
|
threading.Thread(target=_do_generate, daemon=True).start()
|
|
return jsonify({"status": "started"})
|
|
|
|
|
|
@app.route("/stats/status")
|
|
def stats_status():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
return jsonify({"ready": STATS_FILE.is_file(),
|
|
"generating": _gen_state["generating"]})
|
|
|
|
|
|
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
|
|
|
|
@app.route("/raw/<path:subpath>")
|
|
def raw_file(subpath):
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
abort(404)
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
return send_from_directory(ASSETS_ROOT, subpath)
|
|
|
|
|
|
# ── Suppression de fichiers ───────────────────────────────────────────
|
|
|
|
@app.route("/delete", methods=["POST"])
|
|
def delete_file():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
subpath = request.form.get("path", "").strip()
|
|
if not subpath:
|
|
abort(400)
|
|
target = _safe_path(subpath)
|
|
if not target.exists():
|
|
abort(404)
|
|
if target.is_dir():
|
|
abort(400)
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
parent = str(Path(subpath).parent)
|
|
target.unlink()
|
|
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
|
|
|
|
|
# ── Renommage de fichiers ─────────────────────────────────────────────
|
|
|
|
@app.route("/rename", methods=["POST"])
|
|
def rename_file():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
subpath = request.form.get("path", "").strip()
|
|
new_name = request.form.get("new_name", "").strip()
|
|
|
|
if not subpath or not new_name:
|
|
return jsonify({"error": "Paramètres manquants"}), 400
|
|
|
|
if "/" in new_name or "\\" in new_name or new_name in (".", ".."):
|
|
return jsonify({"error": "Nom invalide"}), 400
|
|
|
|
new_name = secure_filename(new_name)
|
|
if not new_name:
|
|
return jsonify({"error": "Nom invalide après nettoyage"}), 400
|
|
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
return jsonify({"error": "Fichier introuvable"}), 404
|
|
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
return jsonify({"error": "Accès refusé"}), 403
|
|
|
|
dest = target.parent / new_name
|
|
if not dest.is_relative_to(ASSETS_ROOT):
|
|
return jsonify({"error": "Destination invalide"}), 400
|
|
|
|
if dest.exists():
|
|
return jsonify({"error": f"« {new_name} » existe déjà"}), 409
|
|
|
|
target.rename(dest)
|
|
new_path = str(dest.relative_to(ASSETS_ROOT))
|
|
return jsonify({
|
|
"name": new_name,
|
|
"path": new_path,
|
|
"browse_url": url_for("browse", subpath=new_path),
|
|
})
|
|
|
|
|
|
# ── Vérification des conflits avant upload ───────────────────────────
|
|
|
|
@app.route("/check-upload", methods=["POST"])
|
|
def check_upload():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
subpath = request.form.get("path", "").strip()
|
|
names = request.form.getlist("names")
|
|
|
|
if subpath:
|
|
parts = Path(subpath).parts
|
|
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
|
abort(403)
|
|
|
|
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
|
if not dest.is_dir():
|
|
abort(400)
|
|
|
|
conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()]
|
|
return jsonify({"conflicts": conflicts})
|
|
|
|
|
|
# ── Upload de fichiers ────────────────────────────────────────────────
|
|
|
|
@app.route("/upload", methods=["POST"])
|
|
def upload_file():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
subpath = request.form.get("path", "").strip()
|
|
files = request.files.getlist("files")
|
|
|
|
if not files or all(f.filename == "" for f in files):
|
|
abort(400)
|
|
|
|
if subpath:
|
|
parts = Path(subpath).parts
|
|
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
|
abort(403)
|
|
|
|
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
|
if not dest.is_dir():
|
|
abort(400)
|
|
|
|
conflict = request.form.get("conflict", "overwrite")
|
|
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
|
conflict = "overwrite"
|
|
|
|
for f in files:
|
|
name = secure_filename(f.filename or "")
|
|
if not name:
|
|
continue
|
|
out_path = dest / name
|
|
if out_path.exists():
|
|
if conflict == "skip":
|
|
continue
|
|
elif conflict == "backup":
|
|
try:
|
|
out_path.rename(_backup_path(out_path))
|
|
except Exception:
|
|
continue
|
|
elif conflict == "rename":
|
|
out_path = _auto_rename(out_path)
|
|
f.save(out_path)
|
|
|
|
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
|
|
|
|
|
|
# ── Redimensionnement d'images ────────────────────────────────────────
|
|
|
|
@app.route("/resize", methods=["POST"])
|
|
def resize_image():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
subpath = request.form.get("path", "").strip()
|
|
raw_sizes = request.form.getlist("sizes")
|
|
raw_formats = request.form.getlist("formats")
|
|
|
|
if not subpath or not raw_sizes or not raw_formats:
|
|
return jsonify({"error": "Paramètres manquants"}), 400
|
|
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
abort(404)
|
|
|
|
ext = target.suffix.lower()
|
|
if ext not in IMAGE_EXT:
|
|
abort(400)
|
|
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
|
|
try:
|
|
sizes = sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})
|
|
except (ValueError, TypeError):
|
|
return jsonify({"error": "Tailles invalides"}), 400
|
|
|
|
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
|
if not sizes or not formats:
|
|
return jsonify({"error": "Tailles ou formats invalides"}), 400
|
|
|
|
conflict = request.form.get("conflict", "skip")
|
|
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
|
conflict = "skip"
|
|
|
|
is_svg = ext == ".svg"
|
|
stem = target.stem
|
|
parent = target.parent
|
|
created, errors = [], []
|
|
|
|
for fmt in formats:
|
|
for size in sizes:
|
|
out_name = f"{stem}_{size}x{size}.{fmt}"
|
|
out_path = parent / out_name
|
|
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
|
|
|
if out_path.exists():
|
|
if conflict == "skip":
|
|
errors.append({"name": out_name, "reason": "Fichier existant, ignoré"})
|
|
continue
|
|
elif conflict == "backup":
|
|
try:
|
|
bak = _backup_path(out_path)
|
|
out_path.rename(bak)
|
|
except Exception as exc:
|
|
errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"})
|
|
continue
|
|
elif conflict == "rename":
|
|
out_path = _auto_rename(out_path)
|
|
out_name = out_path.name
|
|
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
|
# conflict == "overwrite" : on continue sans rien faire
|
|
|
|
try:
|
|
if fmt == "svg" and is_svg:
|
|
shutil.copy2(target, out_path)
|
|
created.append({"name": out_name, "path": out_rel})
|
|
|
|
elif fmt == "svg" and not is_svg:
|
|
errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"})
|
|
|
|
elif is_svg:
|
|
if not HAS_CAIROSVG:
|
|
errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"})
|
|
continue
|
|
if fmt in ("png", "ico"):
|
|
buf = io.BytesIO()
|
|
_cairosvg.svg2png(url=str(target), write_to=buf,
|
|
output_width=size, output_height=size)
|
|
buf.seek(0)
|
|
img = Image.open(buf).convert("RGBA")
|
|
if fmt == "ico":
|
|
img.save(out_path, format="ICO", sizes=[(size, size)])
|
|
else:
|
|
img.save(out_path, format="PNG")
|
|
else: # jpg
|
|
buf = io.BytesIO()
|
|
_cairosvg.svg2png(url=str(target), write_to=buf,
|
|
output_width=size, output_height=size)
|
|
buf.seek(0)
|
|
img = Image.open(buf).convert("RGB")
|
|
img.save(out_path, format="JPEG", quality=90)
|
|
created.append({"name": out_name, "path": out_rel})
|
|
|
|
else:
|
|
img = Image.open(target)
|
|
img = img.resize((size, size), Image.LANCZOS)
|
|
if fmt == "png":
|
|
if img.mode not in ("RGBA", "RGB", "L"):
|
|
img = img.convert("RGBA")
|
|
img.save(out_path, format="PNG")
|
|
elif fmt == "jpg":
|
|
img = img.convert("RGB")
|
|
img.save(out_path, format="JPEG", quality=90)
|
|
elif fmt == "ico":
|
|
img = img.convert("RGBA")
|
|
img.save(out_path, format="ICO", sizes=[(size, size)])
|
|
created.append({"name": out_name, "path": out_rel})
|
|
|
|
except Exception as exc:
|
|
errors.append({"name": out_name, "reason": str(exc)})
|
|
|
|
return jsonify({"created": created, "errors": errors})
|
|
|
|
|
|
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
|
|
|
|
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return send_from_directory(ASSETS_ROOT, "favicon.ico")
|
|
|
|
|
|
@app.route("/robots.txt")
|
|
def robots():
|
|
return send_from_directory(ASSETS_ROOT, "robots.txt")
|
|
|
|
|
|
@app.route("/<path:subpath>")
|
|
def cdn_file(subpath):
|
|
top = Path(subpath).parts[0]
|
|
if top not in _PUBLIC_TOP:
|
|
abort(404)
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
abort(404)
|
|
return send_from_directory(ASSETS_ROOT, subpath)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, port=5003)
|