grp.items en Jinja2 résolvait la méthode dict.items() au lieu de la clé. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1208 lines
39 KiB
Python
1208 lines
39 KiB
Python
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
|
||
from urllib.parse import urlencode
|
||
from PIL import Image
|
||
|
||
from flask import (Flask, redirect, url_for, session, request,
|
||
render_template, abort, send_from_directory, jsonify)
|
||
from authlib.integrations.flask_client import OAuth
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
from werkzeug.utils import secure_filename
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.environ["SECRET_KEY"]
|
||
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
||
|
||
oauth = OAuth(app)
|
||
oauth.register(
|
||
name="alpid",
|
||
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
|
||
client_id=os.environ["ALPID_CLIENT_ID"],
|
||
client_secret=os.environ["ALPID_CLIENT_SECRET"],
|
||
client_kwargs={"scope": "openid profile email"},
|
||
)
|
||
|
||
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
|
||
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
|
||
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
|
||
TRASH_ROOT = ASSETS_ROOT / ".trash"
|
||
|
||
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
|
||
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
|
||
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
|
||
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
|
||
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
|
||
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
|
||
|
||
_gen_lock = threading.Lock()
|
||
_gen_state = {"generating": False}
|
||
|
||
try:
|
||
_APP_VERSION = (Path(__file__).parent / "VERSION").read_text().strip()
|
||
except Exception:
|
||
_APP_VERSION = "—"
|
||
|
||
_CHANGELOG_FILE = Path(__file__).parent / "CHANGELOG.md"
|
||
|
||
|
||
_HIDDEN = frozenset({
|
||
".git", "scripts", "app",
|
||
".env", ".env.example", ".gitignore",
|
||
".htaccess", ".htpasswd_stats",
|
||
"standard_index.html",
|
||
})
|
||
|
||
try:
|
||
import cairosvg as _cairosvg
|
||
HAS_CAIROSVG = True
|
||
except ImportError:
|
||
HAS_CAIROSVG = False
|
||
|
||
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
|
||
|
||
RESIZE_SIZES = frozenset({32, 64, 100, 128, 200, 300, 500, 600, 1024})
|
||
RESIZE_FORMATS = frozenset({"png", "jpg", "ico", "svg"})
|
||
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
|
||
".xml", ".conf", ".sh", ".robots", ".php"})
|
||
PDF_EXT = frozenset({".pdf"})
|
||
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────
|
||
|
||
def _user():
|
||
return session.get("user")
|
||
|
||
|
||
def _require_admin():
|
||
u = _user()
|
||
if not u:
|
||
session["next_url"] = request.url
|
||
return redirect(url_for("login"))
|
||
if not u.get("is_admin"):
|
||
abort(403)
|
||
return None
|
||
|
||
|
||
def _safe_path(subpath: str) -> Path:
|
||
target = (ASSETS_ROOT / subpath).resolve()
|
||
if not target.is_relative_to(ASSETS_ROOT):
|
||
abort(400)
|
||
return target
|
||
|
||
|
||
def _humansize(n: int) -> str:
|
||
for unit in ("o", "Ko", "Mo", "Go"):
|
||
if n < 1024:
|
||
return f"{n:.0f} {unit}"
|
||
n /= 1024
|
||
return f"{n:.1f} To"
|
||
|
||
|
||
_EXIF_WANTED = frozenset({
|
||
"Make", "Model", "Software", "DateTime", "DateTimeOriginal", "DateTimeDigitized",
|
||
"ExposureTime", "FNumber", "ISOSpeedRatings", "FocalLength", "Flash",
|
||
"WhiteBalance", "ExposureProgram", "MeteringMode", "Orientation",
|
||
"XResolution", "YResolution", "ResolutionUnit", "ColorSpace",
|
||
"PixelXDimension", "PixelYDimension", "Artist", "Copyright", "GPSInfo",
|
||
})
|
||
|
||
|
||
def _image_meta(path: Path) -> dict:
|
||
try:
|
||
from PIL import ExifTags
|
||
img = Image.open(path)
|
||
meta = {
|
||
"width": img.width,
|
||
"height": img.height,
|
||
"format": img.format or path.suffix.lstrip(".").upper(),
|
||
"mode": img.mode,
|
||
"dpi": img.info.get("dpi"),
|
||
}
|
||
exif = img.getexif()
|
||
if exif:
|
||
rows = {}
|
||
for tag_id, val in exif.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
sub = exif.get_ifd(0x8769)
|
||
for tag_id, val in sub.items():
|
||
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
||
if tag in _EXIF_WANTED and tag != "GPSInfo":
|
||
rows[tag] = str(val)
|
||
gps_ifd = exif.get_ifd(0x8825)
|
||
if gps_ifd:
|
||
rows["GPS"] = _parse_gps(gps_ifd)
|
||
if rows:
|
||
meta["exif"] = rows
|
||
return meta
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _parse_gps(gps_ifd: dict):
|
||
try:
|
||
from PIL import ExifTags
|
||
def _dms(coords):
|
||
d, m, s = coords
|
||
return float(d) + float(m) / 60 + float(s) / 3600
|
||
|
||
lat = _dms(gps_ifd.get(2, (0, 0, 0)))
|
||
lon = _dms(gps_ifd.get(4, (0, 0, 0)))
|
||
latR = gps_ifd.get(1, "N")
|
||
lonR = gps_ifd.get(3, "E")
|
||
if latR == "S": lat = -lat
|
||
if lonR == "W": lon = -lon
|
||
return f"{lat:.6f}, {lon:.6f}"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _backup_path(p: Path) -> Path:
|
||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
return p.parent / f"{p.stem}_bak_{ts}{p.suffix}"
|
||
|
||
|
||
def _auto_rename(p: Path) -> Path:
|
||
i = 1
|
||
while True:
|
||
candidate = p.parent / f"{p.stem}_{i}{p.suffix}"
|
||
if not candidate.exists():
|
||
return candidate
|
||
i += 1
|
||
|
||
|
||
def _trash_move(target: Path, subpath: str) -> None:
|
||
rel = Path(subpath)
|
||
trash_dest = TRASH_ROOT / rel
|
||
trash_dest.parent.mkdir(parents=True, exist_ok=True)
|
||
if trash_dest.exists():
|
||
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
trash_dest = trash_dest.parent / f"{trash_dest.stem}_{ts}{trash_dest.suffix}"
|
||
shutil.move(str(target), trash_dest)
|
||
Path(str(trash_dest) + ".trashinfo").write_text(json.dumps({
|
||
"original_path": str(rel),
|
||
"deleted_at": datetime.now().isoformat(timespec="seconds"),
|
||
}))
|
||
|
||
|
||
def _trash_restore(trash_rel: str, conflict: str = "") -> tuple:
|
||
target = (TRASH_ROOT / trash_rel).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
return False, "forbidden"
|
||
if not target.is_file():
|
||
return False, "not_found"
|
||
info_path = Path(str(target) + ".trashinfo")
|
||
try:
|
||
original = json.loads(info_path.read_text())["original_path"]
|
||
except Exception:
|
||
original = trash_rel
|
||
dest = (ASSETS_ROOT / original).resolve()
|
||
if not str(dest).startswith(str(ASSETS_ROOT)):
|
||
return False, "forbidden"
|
||
if dest.exists():
|
||
if not conflict:
|
||
return False, "conflict"
|
||
if conflict == "rename":
|
||
dest = _auto_rename(dest)
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.move(str(target), str(dest))
|
||
if info_path.exists():
|
||
info_path.unlink()
|
||
return True, str(dest.relative_to(ASSETS_ROOT))
|
||
|
||
|
||
def _trash_purge(days: int = 30) -> int:
|
||
if not TRASH_ROOT.exists():
|
||
return 0
|
||
cutoff = datetime.now() - timedelta(days=days)
|
||
count = 0
|
||
for info_file in list(TRASH_ROOT.rglob("*.trashinfo")):
|
||
try:
|
||
deleted_at = datetime.fromisoformat(
|
||
json.loads(info_file.read_text())["deleted_at"]
|
||
)
|
||
if deleted_at < cutoff:
|
||
f = Path(str(info_file)[:-len(".trashinfo")])
|
||
if f.exists():
|
||
f.unlink()
|
||
info_file.unlink()
|
||
count += 1
|
||
except Exception:
|
||
pass
|
||
for d in sorted(TRASH_ROOT.rglob("*"), reverse=True):
|
||
if d.is_dir() and d != TRASH_ROOT:
|
||
try:
|
||
d.rmdir()
|
||
except OSError:
|
||
pass
|
||
return count
|
||
|
||
|
||
def _trash_list() -> list:
|
||
if not TRASH_ROOT.exists():
|
||
return []
|
||
entries = []
|
||
for info_file in TRASH_ROOT.rglob("*.trashinfo"):
|
||
try:
|
||
data = json.loads(info_file.read_text())
|
||
fpath = Path(str(info_file)[:-len(".trashinfo")])
|
||
if not fpath.exists():
|
||
info_file.unlink()
|
||
continue
|
||
rel = fpath.relative_to(TRASH_ROOT)
|
||
ext = fpath.suffix.lower()
|
||
stat = fpath.stat()
|
||
entries.append({
|
||
"name": fpath.name,
|
||
"path": str(rel),
|
||
"original_path": data.get("original_path", str(rel)),
|
||
"deleted_at": datetime.fromisoformat(data["deleted_at"]),
|
||
"size": stat.st_size,
|
||
"is_dir": False,
|
||
"is_image": ext in IMAGE_EXT,
|
||
"is_text": ext in TEXT_EXT,
|
||
"is_pdf": ext in PDF_EXT,
|
||
"ext": ext,
|
||
})
|
||
except Exception:
|
||
pass
|
||
entries.sort(key=lambda e: e["deleted_at"], reverse=True)
|
||
return entries
|
||
|
||
|
||
def _trash_count() -> int:
|
||
if not TRASH_ROOT.exists():
|
||
return 0
|
||
return sum(1 for f in TRASH_ROOT.rglob("*")
|
||
if f.is_file() and not f.name.endswith(".trashinfo"))
|
||
|
||
|
||
def _trash_stats() -> dict:
|
||
if not TRASH_ROOT.exists():
|
||
return {"files": 0, "size": 0, "oldest": None}
|
||
files, size, oldest = 0, 0, None
|
||
for f in TRASH_ROOT.rglob("*"):
|
||
if not f.is_file() or f.name.endswith(".trashinfo"):
|
||
continue
|
||
files += 1
|
||
size += f.stat().st_size
|
||
info = Path(str(f) + ".trashinfo")
|
||
try:
|
||
dt = datetime.fromisoformat(json.loads(info.read_text())["deleted_at"])
|
||
if oldest is None or dt < oldest:
|
||
oldest = dt
|
||
except Exception:
|
||
pass
|
||
return {"files": files, "size": size, "oldest": oldest}
|
||
|
||
|
||
def _folder_stats(path: Path) -> dict:
|
||
files, size = 0, 0
|
||
for f in path.rglob("*"):
|
||
if not f.is_file():
|
||
continue
|
||
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
|
||
continue
|
||
files += 1
|
||
size += f.stat().st_size
|
||
return {"files": files, "size": size}
|
||
|
||
|
||
def _parse_date(s: str):
|
||
if not s:
|
||
return None
|
||
try:
|
||
return datetime.strptime(s, "%Y-%m-%d")
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _load_hits() -> dict:
|
||
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
|
||
if not STATS_JSON.is_file():
|
||
return {}
|
||
try:
|
||
data = json.loads(STATS_JSON.read_text())
|
||
hits = {}
|
||
for item in data.get("requests", {}).get("data", []):
|
||
url = item.get("data", "").lstrip("/")
|
||
count = item.get("hits", {}).get("count", 0)
|
||
if url and count:
|
||
hits[url] = count
|
||
return hits
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _entry(item: Path) -> dict:
|
||
stat = item.stat()
|
||
ext = item.suffix.lower()
|
||
return {
|
||
"name": item.name,
|
||
"path": str(item.relative_to(ASSETS_ROOT)),
|
||
"is_dir": item.is_dir(),
|
||
"size": stat.st_size if item.is_file() else None,
|
||
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
||
"ext": ext,
|
||
"is_image": ext in IMAGE_EXT,
|
||
"is_text": ext in TEXT_EXT,
|
||
"is_pdf": ext in PDF_EXT,
|
||
}
|
||
|
||
|
||
# ── Context processor ────────────────────────────────────────────────
|
||
|
||
@app.context_processor
|
||
def _inject_globals():
|
||
u = _user()
|
||
return {
|
||
"user": u,
|
||
"trash_count": _trash_count() if u else 0,
|
||
"humansize": _humansize,
|
||
"app_version": _APP_VERSION,
|
||
}
|
||
|
||
|
||
# ── Auth ──────────────────────────────────────────────────────────────
|
||
|
||
@app.route("/auth/login")
|
||
def login():
|
||
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
|
||
|
||
|
||
@app.route("/auth/callback")
|
||
def callback():
|
||
token = oauth.alpid.authorize_access_token()
|
||
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
|
||
email = info.get("email", "")
|
||
groups = set(info.get("groups", []))
|
||
if groups:
|
||
is_admin = bool(groups & ADMIN_GROUPS)
|
||
elif ADMIN_EMAILS:
|
||
is_admin = email in ADMIN_EMAILS
|
||
else:
|
||
is_admin = True
|
||
session["user"] = {
|
||
"sub": info["sub"],
|
||
"name": info.get("name") or info.get("preferred_username", ""),
|
||
"email": email,
|
||
"is_admin": is_admin,
|
||
}
|
||
session["id_token"] = token.get("id_token", "")
|
||
return redirect(session.pop("next_url", url_for("dashboard")))
|
||
|
||
|
||
@app.route("/auth/logout")
|
||
def logout():
|
||
id_token = session.get("id_token")
|
||
session.clear()
|
||
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
|
||
if id_token:
|
||
params["id_token_hint"] = id_token
|
||
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
|
||
|
||
|
||
# ── Dashboard ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/")
|
||
def dashboard():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
folders = {}
|
||
for item in sorted(ASSETS_ROOT.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
if item.is_dir():
|
||
folders[item.name] = _folder_stats(item)
|
||
|
||
return render_template("dashboard.html",
|
||
folders=folders,
|
||
total_files=sum(v["files"] for v in folders.values()),
|
||
total_size=sum(v["size"] for v in folders.values()),
|
||
trash=_trash_stats(),
|
||
)
|
||
|
||
|
||
@app.route("/changelog")
|
||
def changelog():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
sections = _parse_changelog()
|
||
return render_template("changelog.html", sections=sections)
|
||
|
||
|
||
def _parse_changelog():
|
||
"""Parse CHANGELOG.md into a list of version dicts."""
|
||
try:
|
||
text = _CHANGELOG_FILE.read_text()
|
||
except Exception:
|
||
return []
|
||
|
||
sections = []
|
||
current = None
|
||
current_group = None
|
||
|
||
for line in text.splitlines():
|
||
if line.startswith("## "):
|
||
if current:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
sections.append(current)
|
||
m = re.match(r"## \[(.+?)\] — (.+)", line)
|
||
current = {"version": m.group(1) if m else line[3:],
|
||
"date": m.group(2) if m else "",
|
||
"groups": []}
|
||
current_group = None
|
||
elif line.startswith("### ") and current is not None:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
current_group = {"title": line[4:], "entries": []}
|
||
elif line.startswith("- ") and current_group is not None:
|
||
current_group["entries"].append(line[2:])
|
||
|
||
if current:
|
||
if current_group:
|
||
current["groups"].append(current_group)
|
||
sections.append(current)
|
||
|
||
return sections
|
||
|
||
|
||
# ── Navigateur de fichiers ────────────────────────────────────────────
|
||
|
||
@app.route("/browse/")
|
||
@app.route("/browse/<path:subpath>")
|
||
def browse(subpath=""):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
|
||
if subpath:
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
if target.is_file():
|
||
return _file_preview(target, subpath)
|
||
|
||
hits = _load_hits()
|
||
|
||
entries = []
|
||
for item in sorted(target.iterdir()):
|
||
if item.name in _HIDDEN or item.name.startswith("."):
|
||
continue
|
||
e = _entry(item)
|
||
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
|
||
entries.append(e)
|
||
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
|
||
|
||
parts = Path(subpath).parts if subpath else ()
|
||
breadcrumb = [
|
||
{"name": p, "path": str(Path(*parts[:i + 1]))}
|
||
for i, p in enumerate(parts)
|
||
]
|
||
|
||
return render_template("browse.html",
|
||
entries=entries,
|
||
subpath=subpath,
|
||
breadcrumb=breadcrumb,
|
||
has_hits=STATS_JSON.is_file(),
|
||
)
|
||
|
||
|
||
def _file_preview(path: Path, subpath: str):
|
||
ext = path.suffix.lower()
|
||
stat = path.stat()
|
||
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
|
||
|
||
siblings = sorted(
|
||
[f for f in path.parent.iterdir()
|
||
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
|
||
key=lambda f: f.name.lower(),
|
||
)
|
||
idx = next((i for i, f in enumerate(siblings) if f == path), None)
|
||
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
|
||
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
|
||
|
||
ctx = dict(
|
||
subpath=subpath,
|
||
filename=path.name,
|
||
filesize=_humansize(stat.st_size),
|
||
mtime=datetime.fromtimestamp(stat.st_mtime),
|
||
raw_url=url_for("raw_file", subpath=subpath),
|
||
parent_path=parent,
|
||
prev_path=prev_path,
|
||
next_path=next_path,
|
||
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
|
||
ext=ext,
|
||
)
|
||
if ext in IMAGE_EXT:
|
||
ctx["meta"] = _image_meta(path)
|
||
return render_template("preview_image.html", **ctx)
|
||
if ext in TEXT_EXT:
|
||
content = path.read_text(errors="replace")
|
||
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
|
||
return render_template("preview_other.html", **ctx)
|
||
|
||
|
||
# ── Recherche ─────────────────────────────────────────────────────────
|
||
|
||
@app.route("/search")
|
||
def search():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
q = request.args.get("q", "").strip()
|
||
after_s = request.args.get("after", "").strip()
|
||
before_s = request.args.get("before", "").strip()
|
||
in_content = request.args.get("content") == "1"
|
||
|
||
dt_after = _parse_date(after_s)
|
||
dt_before = _parse_date(before_s)
|
||
|
||
results = []
|
||
searched = bool(q or dt_after or dt_before)
|
||
|
||
if searched:
|
||
q_low = q.lower()
|
||
for path in sorted(ASSETS_ROOT.rglob("*")):
|
||
if not path.is_file():
|
||
continue
|
||
parts = path.relative_to(ASSETS_ROOT).parts
|
||
if any(p in _HIDDEN or p.startswith(".") for p in parts):
|
||
continue
|
||
|
||
stat = path.stat()
|
||
mtime = datetime.fromtimestamp(stat.st_mtime)
|
||
|
||
if dt_after and mtime.date() < dt_after.date():
|
||
continue
|
||
if dt_before and mtime.date() > dt_before.date():
|
||
continue
|
||
|
||
name_match = (not q) or q_low in path.name.lower()
|
||
content_match = None
|
||
|
||
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
|
||
try:
|
||
text = path.read_text(errors="replace")
|
||
idx = text.lower().find(q_low)
|
||
if idx != -1:
|
||
start = max(0, idx - 60)
|
||
end = min(len(text), idx + len(q) + 60)
|
||
snippet = text[start:end].replace("\n", " ")
|
||
content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "")
|
||
name_match = True
|
||
except Exception:
|
||
pass
|
||
|
||
if not name_match:
|
||
continue
|
||
|
||
e = _entry(path)
|
||
e["content_match"] = content_match
|
||
results.append(e)
|
||
|
||
return render_template("search.html",
|
||
q=q,
|
||
after=after_s,
|
||
before=before_s,
|
||
in_content=in_content,
|
||
results=results,
|
||
searched=searched,
|
||
)
|
||
|
||
|
||
# ── Statistiques GoAccess ─────────────────────────────────────────────
|
||
|
||
@app.route("/stats/")
|
||
def stats():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return render_template("stats.html",
|
||
has_report=STATS_FILE.is_file(),
|
||
)
|
||
|
||
|
||
@app.route("/stats/report")
|
||
def stats_report():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if not STATS_FILE.is_file():
|
||
abort(404)
|
||
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
|
||
|
||
|
||
def _do_generate():
|
||
try:
|
||
if STATS_GENERATE_CMD:
|
||
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
|
||
else:
|
||
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
|
||
f"--output={STATS_FILE}"]
|
||
if STATS_JSON:
|
||
cmd.append(f"--output={STATS_JSON}")
|
||
subprocess.run(cmd, timeout=600, check=False)
|
||
finally:
|
||
with _gen_lock:
|
||
_gen_state["generating"] = False
|
||
|
||
|
||
@app.route("/stats/generate", methods=["POST"])
|
||
def stats_generate():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if STATS_FILE.is_file():
|
||
return jsonify({"status": "exists"})
|
||
with _gen_lock:
|
||
if _gen_state["generating"]:
|
||
return jsonify({"status": "generating"})
|
||
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
|
||
return jsonify({"status": "error",
|
||
"message": "STATS_LOG_FILE non configuré"}), 500
|
||
_gen_state["generating"] = True
|
||
threading.Thread(target=_do_generate, daemon=True).start()
|
||
return jsonify({"status": "started"})
|
||
|
||
|
||
@app.route("/stats/status")
|
||
def stats_status():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
return jsonify({"ready": STATS_FILE.is_file(),
|
||
"generating": _gen_state["generating"]})
|
||
|
||
|
||
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
|
||
|
||
@app.route("/raw/<path:subpath>")
|
||
def raw_file(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
# ── Suppression de fichiers ───────────────────────────────────────────
|
||
|
||
@app.route("/delete", methods=["POST"])
|
||
def delete_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
subpath = request.form.get("path", "").strip()
|
||
if not subpath:
|
||
abort(400)
|
||
target = _safe_path(subpath)
|
||
if not target.exists():
|
||
abort(404)
|
||
if target.is_dir():
|
||
abort(400)
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
parent = str(Path(subpath).parent)
|
||
_trash_move(target, subpath)
|
||
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
||
|
||
|
||
# ── Corbeille ────────────────────────────────────────────────────────
|
||
|
||
@app.route("/trash")
|
||
def trash_list():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
_trash_purge(30)
|
||
entries = _trash_list()
|
||
return render_template("trash.html", entries=entries)
|
||
|
||
|
||
@app.route("/trash/raw/<path:subpath>")
|
||
def trash_raw(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = (TRASH_ROOT / subpath).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if not target.is_file():
|
||
abort(404)
|
||
return send_from_directory(TRASH_ROOT, subpath)
|
||
|
||
|
||
@app.route("/trash/delete", methods=["POST"])
|
||
def trash_delete():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
trash_rel = request.form.get("path", "").strip()
|
||
if not trash_rel:
|
||
abort(400)
|
||
target = (TRASH_ROOT / trash_rel).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if target.is_file():
|
||
target.unlink()
|
||
info = Path(str(target) + ".trashinfo")
|
||
if info.exists():
|
||
info.unlink()
|
||
return redirect(url_for("trash_list"))
|
||
|
||
|
||
@app.route("/trash/restore", methods=["POST"])
|
||
def trash_restore():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
trash_rel = request.form.get("path", "").strip()
|
||
conflict = request.form.get("conflict", "").strip()
|
||
if not trash_rel:
|
||
abort(400)
|
||
ok, result = _trash_restore(trash_rel, conflict)
|
||
if not ok:
|
||
if result == "conflict":
|
||
return jsonify({"conflict": True, "path": trash_rel}), 409
|
||
abort(400)
|
||
parent = str(Path(result).parent)
|
||
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
||
|
||
|
||
@app.route("/trash/preview/<path:subpath>")
|
||
def trash_preview(subpath):
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
target = (TRASH_ROOT / subpath).resolve()
|
||
if not str(target).startswith(str(TRASH_ROOT)):
|
||
abort(400)
|
||
if not target.is_file():
|
||
abort(404)
|
||
stat = target.stat()
|
||
ext = target.suffix.lower()
|
||
ctx = dict(
|
||
subpath = subpath,
|
||
filename = target.name,
|
||
filesize = _humansize(stat.st_size),
|
||
mtime = datetime.fromtimestamp(stat.st_mtime),
|
||
raw_url = url_for("trash_raw", subpath=subpath),
|
||
parent_path= None,
|
||
prev_path = None,
|
||
next_path = None,
|
||
sibling_pos= "",
|
||
ext = ext,
|
||
from_trash = True,
|
||
)
|
||
if ext in IMAGE_EXT:
|
||
ctx["meta"] = _image_meta(target)
|
||
return render_template("preview_image.html", **ctx)
|
||
if ext in TEXT_EXT:
|
||
return render_template("preview_text.html",
|
||
content=target.read_text(errors="replace"),
|
||
lang=ext.lstrip("."), **ctx)
|
||
return render_template("preview_other.html", **ctx)
|
||
|
||
|
||
@app.route("/trash/empty", methods=["POST"])
|
||
def trash_empty():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
if TRASH_ROOT.exists():
|
||
shutil.rmtree(TRASH_ROOT)
|
||
TRASH_ROOT.mkdir(exist_ok=True)
|
||
return redirect(url_for("trash_list"))
|
||
|
||
|
||
# ── Renommage de fichiers ─────────────────────────────────────────────
|
||
|
||
@app.route("/rename", methods=["POST"])
|
||
def rename_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
new_name = request.form.get("new_name", "").strip()
|
||
|
||
if not subpath or not new_name:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
if "/" in new_name or "\\" in new_name or new_name in (".", ".."):
|
||
return jsonify({"error": "Nom invalide"}), 400
|
||
|
||
new_name = secure_filename(new_name)
|
||
if not new_name:
|
||
return jsonify({"error": "Nom invalide après nettoyage"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"error": "Fichier introuvable"}), 404
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
return jsonify({"error": "Accès refusé"}), 403
|
||
|
||
dest = target.parent / new_name
|
||
if not dest.is_relative_to(ASSETS_ROOT):
|
||
return jsonify({"error": "Destination invalide"}), 400
|
||
|
||
if dest.exists():
|
||
return jsonify({"error": f"« {new_name} » existe déjà"}), 409
|
||
|
||
target.rename(dest)
|
||
new_path = str(dest.relative_to(ASSETS_ROOT))
|
||
return jsonify({
|
||
"name": new_name,
|
||
"path": new_path,
|
||
"browse_url": url_for("browse", subpath=new_path),
|
||
})
|
||
|
||
|
||
# ── Vérification des conflits avant upload ───────────────────────────
|
||
|
||
@app.route("/check-upload", methods=["POST"])
|
||
def check_upload():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
names = request.form.getlist("names")
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflicts = [n for n in names if n and (dest / secure_filename(n)).exists()]
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Upload de fichiers ────────────────────────────────────────────────
|
||
|
||
@app.route("/upload", methods=["POST"])
|
||
def upload_file():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
files = request.files.getlist("files")
|
||
|
||
if not files or all(f.filename == "" for f in files):
|
||
abort(400)
|
||
|
||
if subpath:
|
||
parts = Path(subpath).parts
|
||
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
||
abort(403)
|
||
|
||
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
||
if not dest.is_dir():
|
||
abort(400)
|
||
|
||
conflict = request.form.get("conflict", "overwrite")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "overwrite"
|
||
|
||
for f in files:
|
||
name = secure_filename(f.filename or "")
|
||
if not name:
|
||
continue
|
||
out_path = dest / name
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
out_path.rename(_backup_path(out_path))
|
||
except Exception:
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
f.save(out_path)
|
||
|
||
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
|
||
|
||
|
||
# ── Vérification des conflits avant redimensionnement ────────────────
|
||
|
||
@app.route("/check-resize", methods=["POST"])
|
||
def check_resize():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
|
||
if not subpath:
|
||
return jsonify({"conflicts": []})
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
return jsonify({"conflicts": []})
|
||
|
||
try:
|
||
sizes = [int(s) for s in raw_sizes if int(s) in RESIZE_SIZES]
|
||
except (ValueError, TypeError):
|
||
sizes = []
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
|
||
src_ext = target.suffix.lstrip(".")
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
|
||
all_dims = [(s, s) for s in sizes]
|
||
for cs in request.form.getlist("custom_sizes"):
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w >= 1 and h >= 1:
|
||
all_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Pas de dimensions sélectionnées → conserver celles d'origine
|
||
if not all_dims:
|
||
try:
|
||
src = Image.open(target)
|
||
all_dims = [(src.width, src.height)]
|
||
src.close()
|
||
except Exception:
|
||
return jsonify({"conflicts": []})
|
||
|
||
# Pas de format sélectionné → conserver le format d'origine
|
||
if not formats:
|
||
formats = [src_ext]
|
||
|
||
conflicts = []
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
if (parent / out_name).exists():
|
||
conflicts.append(out_name)
|
||
|
||
return jsonify({"conflicts": conflicts})
|
||
|
||
|
||
# ── Redimensionnement d'images ────────────────────────────────────────
|
||
|
||
@app.route("/resize", methods=["POST"])
|
||
def resize_image():
|
||
redir = _require_admin()
|
||
if redir:
|
||
return redir
|
||
|
||
subpath = request.form.get("path", "").strip()
|
||
raw_sizes = request.form.getlist("sizes")
|
||
raw_formats = request.form.getlist("formats")
|
||
raw_customs = request.form.getlist("custom_sizes")
|
||
|
||
if not subpath or not raw_formats:
|
||
return jsonify({"error": "Paramètres manquants"}), 400
|
||
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
|
||
ext = target.suffix.lower()
|
||
if ext not in IMAGE_EXT:
|
||
abort(400)
|
||
|
||
top = Path(subpath).parts[0]
|
||
if top in _HIDDEN or top.startswith("."):
|
||
abort(403)
|
||
|
||
try:
|
||
square_dims = [(s, s) for s in sorted({int(s) for s in raw_sizes if int(s) in RESIZE_SIZES})]
|
||
except (ValueError, TypeError):
|
||
square_dims = []
|
||
|
||
formats = [f for f in raw_formats if f in RESIZE_FORMATS]
|
||
|
||
# Dimensions libres — validées contre la résolution source
|
||
src_img = Image.open(target)
|
||
max_w, max_h = src_img.width, src_img.height
|
||
src_img.close()
|
||
|
||
custom_dims = []
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w < 1 or h < 1:
|
||
continue
|
||
if w > max_w or h > max_h:
|
||
continue
|
||
custom_dims.append((w, h))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
all_dims = square_dims + custom_dims
|
||
|
||
# Pas de dimensions sélectionnées → conserver celles d'origine
|
||
if not all_dims:
|
||
all_dims = [(max_w, max_h)]
|
||
|
||
# Pas de format sélectionné → conserver le format d'origine
|
||
if not formats:
|
||
formats = [ext.lstrip(".")]
|
||
|
||
conflict = request.form.get("conflict", "skip")
|
||
if conflict not in ("backup", "overwrite", "rename", "skip"):
|
||
conflict = "skip"
|
||
|
||
is_svg = ext == ".svg"
|
||
stem = re.sub(r"_\d+x\d+$", "", target.stem)
|
||
parent = target.parent
|
||
created, errors = [], []
|
||
|
||
# Signaler les dimensions hors-bornes
|
||
for cs in raw_customs:
|
||
try:
|
||
w, h = (int(x.strip()) for x in re.split(r"[x×]", cs.lower()))
|
||
if w > max_w or h > max_h:
|
||
for f in formats:
|
||
errors.append({"name": f"{stem}_{w}x{h}.{f}",
|
||
"reason": f"{w}×{h} dépasse la résolution source ({max_w}×{max_h})"})
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
for fmt in formats:
|
||
for w, h in all_dims:
|
||
out_name = f"{stem}_{w}x{h}.{fmt}"
|
||
out_path = parent / out_name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
if out_path.exists():
|
||
if conflict == "skip":
|
||
errors.append({"name": out_name, "reason": "Fichier existant, ignoré"})
|
||
continue
|
||
elif conflict == "backup":
|
||
try:
|
||
bak = _backup_path(out_path)
|
||
out_path.rename(bak)
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": f"Backup impossible : {exc}"})
|
||
continue
|
||
elif conflict == "rename":
|
||
out_path = _auto_rename(out_path)
|
||
out_name = out_path.name
|
||
out_rel = str(out_path.relative_to(ASSETS_ROOT))
|
||
|
||
try:
|
||
if fmt == "svg" and is_svg:
|
||
shutil.copy2(target, out_path)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
elif fmt == "svg" and not is_svg:
|
||
errors.append({"name": out_name, "reason": "Conversion raster→SVG non supportée"})
|
||
|
||
elif is_svg:
|
||
if not HAS_CAIROSVG:
|
||
errors.append({"name": out_name, "reason": "cairosvg non disponible sur ce serveur"})
|
||
continue
|
||
if fmt in ("png", "ico"):
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGBA")
|
||
if fmt == "ico":
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
else:
|
||
img.save(out_path, format="PNG")
|
||
else:
|
||
buf = io.BytesIO()
|
||
_cairosvg.svg2png(url=str(target), write_to=buf,
|
||
output_width=w, output_height=h)
|
||
buf.seek(0)
|
||
img = Image.open(buf).convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
else:
|
||
img = Image.open(target)
|
||
# LANCZOS ne supporte pas le mode palette (ICO, GIF…)
|
||
if img.mode not in ("RGB", "RGBA", "L", "LA", "I", "F"):
|
||
img = img.convert("RGBA")
|
||
img = img.resize((w, h), Image.LANCZOS)
|
||
if fmt == "png":
|
||
if img.mode not in ("RGBA", "RGB", "L"):
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="PNG")
|
||
elif fmt == "jpg":
|
||
img = img.convert("RGB")
|
||
img.save(out_path, format="JPEG", quality=90)
|
||
elif fmt == "ico":
|
||
img = img.convert("RGBA")
|
||
img.save(out_path, format="ICO", sizes=[(w, h)])
|
||
created.append({"name": out_name, "path": out_rel})
|
||
|
||
except Exception as exc:
|
||
errors.append({"name": out_name, "reason": str(exc)})
|
||
|
||
return jsonify({"created": created, "errors": errors})
|
||
|
||
|
||
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
|
||
|
||
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
|
||
|
||
|
||
@app.route("/favicon.ico")
|
||
def favicon():
|
||
return send_from_directory(ASSETS_ROOT, "favicon.ico")
|
||
|
||
|
||
@app.route("/robots.txt")
|
||
def robots():
|
||
return send_from_directory(ASSETS_ROOT, "robots.txt")
|
||
|
||
|
||
@app.route("/<path:subpath>")
|
||
def cdn_file(subpath):
|
||
top = Path(subpath).parts[0]
|
||
if top not in _PUBLIC_TOP:
|
||
abort(404)
|
||
target = _safe_path(subpath)
|
||
if not target.is_file():
|
||
abort(404)
|
||
return send_from_directory(ASSETS_ROOT, subpath)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(debug=True, port=5003)
|