session.clear() seul ne déconnectait pas la session Keycloak, provoquant une reconnexion automatique immédiate. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
521 lines
16 KiB
Python
521 lines
16 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
from flask import (Flask, redirect, url_for, session, request,
|
|
render_template, abort, send_from_directory, jsonify)
|
|
from authlib.integrations.flask_client import OAuth
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ["SECRET_KEY"]
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
|
|
|
oauth = OAuth(app)
|
|
oauth.register(
|
|
name="alpid",
|
|
server_metadata_url=os.environ["ALPID_DISCOVERY_URL"],
|
|
client_id=os.environ["ALPID_CLIENT_ID"],
|
|
client_secret=os.environ["ALPID_CLIENT_SECRET"],
|
|
client_kwargs={"scope": "openid profile email"},
|
|
)
|
|
|
|
ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(","))
|
|
ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip())
|
|
ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve()
|
|
|
|
_alpid_base = os.environ["ALPID_DISCOVERY_URL"].split("/.well-known/")[0]
|
|
ALPID_LOGOUT_URL = _alpid_base + "/protocol/openid-connect/logout"
|
|
STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html"))
|
|
STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json"))
|
|
STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "")
|
|
STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "")
|
|
|
|
_gen_lock = threading.Lock()
|
|
_gen_state = {"generating": False}
|
|
|
|
|
|
_HIDDEN = frozenset({
|
|
".git", "scripts", "app",
|
|
".env", ".env.example", ".gitignore",
|
|
".htaccess", ".htpasswd_stats",
|
|
"standard_index.html",
|
|
})
|
|
|
|
IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"})
|
|
TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json",
|
|
".xml", ".conf", ".sh", ".robots", ".php"})
|
|
PDF_EXT = frozenset({".pdf"})
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────
|
|
|
|
def _user():
|
|
return session.get("user")
|
|
|
|
|
|
def _require_admin():
|
|
u = _user()
|
|
if not u:
|
|
session["next_url"] = request.url
|
|
return redirect(url_for("login"))
|
|
if not u.get("is_admin"):
|
|
abort(403)
|
|
return None
|
|
|
|
|
|
def _safe_path(subpath: str) -> Path:
|
|
target = (ASSETS_ROOT / subpath).resolve()
|
|
if not target.is_relative_to(ASSETS_ROOT):
|
|
abort(400)
|
|
return target
|
|
|
|
|
|
def _humansize(n: int) -> str:
|
|
for unit in ("o", "Ko", "Mo", "Go"):
|
|
if n < 1024:
|
|
return f"{n:.0f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} To"
|
|
|
|
|
|
def _folder_stats(path: Path) -> dict:
|
|
files, size = 0, 0
|
|
for f in path.rglob("*"):
|
|
if not f.is_file():
|
|
continue
|
|
if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts):
|
|
continue
|
|
files += 1
|
|
size += f.stat().st_size
|
|
return {"files": files, "size": size}
|
|
|
|
|
|
def _parse_date(s: str):
|
|
if not s:
|
|
return None
|
|
try:
|
|
return datetime.strptime(s, "%Y-%m-%d")
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _load_hits() -> dict:
|
|
"""Parse GoAccess JSON report → {relative_path: hit_count}."""
|
|
if not STATS_JSON.is_file():
|
|
return {}
|
|
try:
|
|
data = json.loads(STATS_JSON.read_text())
|
|
hits = {}
|
|
for item in data.get("requests", {}).get("data", []):
|
|
url = item.get("data", "").lstrip("/")
|
|
count = item.get("hits", {}).get("count", 0)
|
|
if url and count:
|
|
hits[url] = count
|
|
return hits
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _entry(item: Path) -> dict:
|
|
stat = item.stat()
|
|
ext = item.suffix.lower()
|
|
return {
|
|
"name": item.name,
|
|
"path": str(item.relative_to(ASSETS_ROOT)),
|
|
"is_dir": item.is_dir(),
|
|
"size": stat.st_size if item.is_file() else None,
|
|
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
|
"ext": ext,
|
|
"is_image": ext in IMAGE_EXT,
|
|
"is_text": ext in TEXT_EXT,
|
|
"is_pdf": ext in PDF_EXT,
|
|
}
|
|
|
|
|
|
# ── Auth ──────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/auth/login")
|
|
def login():
|
|
return oauth.alpid.authorize_redirect(url_for("callback", _external=True))
|
|
|
|
|
|
@app.route("/auth/callback")
|
|
def callback():
|
|
token = oauth.alpid.authorize_access_token()
|
|
info = token.get("userinfo") or oauth.alpid.userinfo(token=token)
|
|
email = info.get("email", "")
|
|
groups = set(info.get("groups", []))
|
|
if groups:
|
|
is_admin = bool(groups & ADMIN_GROUPS)
|
|
elif ADMIN_EMAILS:
|
|
is_admin = email in ADMIN_EMAILS
|
|
else:
|
|
is_admin = True
|
|
session["user"] = {
|
|
"sub": info["sub"],
|
|
"name": info.get("name") or info.get("preferred_username", ""),
|
|
"email": email,
|
|
"is_admin": is_admin,
|
|
}
|
|
session["id_token"] = token.get("id_token", "")
|
|
return redirect(session.pop("next_url", url_for("dashboard")))
|
|
|
|
|
|
@app.route("/auth/logout")
|
|
def logout():
|
|
id_token = session.get("id_token")
|
|
session.clear()
|
|
params = {"post_logout_redirect_uri": url_for("dashboard", _external=True)}
|
|
if id_token:
|
|
params["id_token_hint"] = id_token
|
|
return redirect(ALPID_LOGOUT_URL + "?" + urlencode(params))
|
|
|
|
|
|
# ── Dashboard ─────────────────────────────────────────────────────────
|
|
|
|
@app.route("/")
|
|
def dashboard():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
folders = {}
|
|
for item in sorted(ASSETS_ROOT.iterdir()):
|
|
if item.name in _HIDDEN or item.name.startswith("."):
|
|
continue
|
|
if item.is_dir():
|
|
folders[item.name] = _folder_stats(item)
|
|
|
|
return render_template("dashboard.html",
|
|
user=_user(),
|
|
folders=folders,
|
|
total_files=sum(v["files"] for v in folders.values()),
|
|
total_size=sum(v["size"] for v in folders.values()),
|
|
humansize=_humansize,
|
|
)
|
|
|
|
|
|
# ── Navigateur de fichiers ────────────────────────────────────────────
|
|
|
|
@app.route("/browse/")
|
|
@app.route("/browse/<path:subpath>")
|
|
def browse(subpath=""):
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
target = _safe_path(subpath)
|
|
if not target.exists():
|
|
abort(404)
|
|
|
|
if subpath:
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
|
|
if target.is_file():
|
|
return _file_preview(target, subpath)
|
|
|
|
hits = _load_hits()
|
|
|
|
entries = []
|
|
for item in sorted(target.iterdir()):
|
|
if item.name in _HIDDEN or item.name.startswith("."):
|
|
continue
|
|
e = _entry(item)
|
|
e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None
|
|
entries.append(e)
|
|
entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower()))
|
|
|
|
parts = Path(subpath).parts if subpath else ()
|
|
breadcrumb = [
|
|
{"name": p, "path": str(Path(*parts[:i + 1]))}
|
|
for i, p in enumerate(parts)
|
|
]
|
|
|
|
return render_template("browse.html",
|
|
user=_user(),
|
|
entries=entries,
|
|
subpath=subpath,
|
|
breadcrumb=breadcrumb,
|
|
humansize=_humansize,
|
|
has_hits=STATS_JSON.is_file(),
|
|
)
|
|
|
|
|
|
def _file_preview(path: Path, subpath: str):
|
|
ext = path.suffix.lower()
|
|
stat = path.stat()
|
|
parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else ""
|
|
|
|
siblings = sorted(
|
|
[f for f in path.parent.iterdir()
|
|
if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")],
|
|
key=lambda f: f.name.lower(),
|
|
)
|
|
idx = next((i for i, f in enumerate(siblings) if f == path), None)
|
|
prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None
|
|
next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None
|
|
|
|
ctx = dict(
|
|
user=_user(),
|
|
subpath=subpath,
|
|
filename=path.name,
|
|
filesize=_humansize(stat.st_size),
|
|
mtime=datetime.fromtimestamp(stat.st_mtime),
|
|
raw_url=url_for("raw_file", subpath=subpath),
|
|
parent_path=parent,
|
|
prev_path=prev_path,
|
|
next_path=next_path,
|
|
sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "",
|
|
)
|
|
if ext in IMAGE_EXT:
|
|
return render_template("preview_image.html", **ctx)
|
|
if ext in TEXT_EXT:
|
|
content = path.read_text(errors="replace")
|
|
return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx)
|
|
return render_template("preview_other.html", **ctx)
|
|
|
|
|
|
# ── Recherche ─────────────────────────────────────────────────────────
|
|
|
|
@app.route("/search")
|
|
def search():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
q = request.args.get("q", "").strip()
|
|
after_s = request.args.get("after", "").strip()
|
|
before_s = request.args.get("before", "").strip()
|
|
in_content = request.args.get("content") == "1"
|
|
|
|
dt_after = _parse_date(after_s)
|
|
dt_before = _parse_date(before_s)
|
|
|
|
results = []
|
|
searched = bool(q or dt_after or dt_before)
|
|
|
|
if searched:
|
|
q_low = q.lower()
|
|
for path in sorted(ASSETS_ROOT.rglob("*")):
|
|
if not path.is_file():
|
|
continue
|
|
parts = path.relative_to(ASSETS_ROOT).parts
|
|
if any(p in _HIDDEN or p.startswith(".") for p in parts):
|
|
continue
|
|
|
|
stat = path.stat()
|
|
mtime = datetime.fromtimestamp(stat.st_mtime)
|
|
|
|
if dt_after and mtime.date() < dt_after.date():
|
|
continue
|
|
if dt_before and mtime.date() > dt_before.date():
|
|
continue
|
|
|
|
name_match = (not q) or q_low in path.name.lower()
|
|
content_match = None
|
|
|
|
if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT:
|
|
try:
|
|
text = path.read_text(errors="replace")
|
|
idx = text.lower().find(q_low)
|
|
if idx != -1:
|
|
start = max(0, idx - 60)
|
|
end = min(len(text), idx + len(q) + 60)
|
|
snippet = text[start:end].replace("\n", " ")
|
|
content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "")
|
|
name_match = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not name_match:
|
|
continue
|
|
|
|
e = _entry(path)
|
|
e["content_match"] = content_match
|
|
results.append(e)
|
|
|
|
return render_template("search.html",
|
|
user=_user(),
|
|
q=q,
|
|
after=after_s,
|
|
before=before_s,
|
|
in_content=in_content,
|
|
results=results,
|
|
searched=searched,
|
|
humansize=_humansize,
|
|
)
|
|
|
|
|
|
# ── Statistiques GoAccess ─────────────────────────────────────────────
|
|
|
|
@app.route("/stats/")
|
|
def stats():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
return render_template("stats.html",
|
|
user=_user(),
|
|
has_report=STATS_FILE.is_file(),
|
|
)
|
|
|
|
|
|
@app.route("/stats/report")
|
|
def stats_report():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
if not STATS_FILE.is_file():
|
|
abort(404)
|
|
return send_from_directory(STATS_FILE.parent, STATS_FILE.name)
|
|
|
|
|
|
def _do_generate():
|
|
try:
|
|
if STATS_GENERATE_CMD:
|
|
subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600)
|
|
else:
|
|
cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED",
|
|
f"--output={STATS_FILE}"]
|
|
if STATS_JSON:
|
|
cmd.append(f"--output={STATS_JSON}")
|
|
subprocess.run(cmd, timeout=600, check=False)
|
|
finally:
|
|
with _gen_lock:
|
|
_gen_state["generating"] = False
|
|
|
|
|
|
@app.route("/stats/generate", methods=["POST"])
|
|
def stats_generate():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
if STATS_FILE.is_file():
|
|
return jsonify({"status": "exists"})
|
|
with _gen_lock:
|
|
if _gen_state["generating"]:
|
|
return jsonify({"status": "generating"})
|
|
if not STATS_LOG_FILE and not STATS_GENERATE_CMD:
|
|
return jsonify({"status": "error",
|
|
"message": "STATS_LOG_FILE non configuré"}), 500
|
|
_gen_state["generating"] = True
|
|
threading.Thread(target=_do_generate, daemon=True).start()
|
|
return jsonify({"status": "started"})
|
|
|
|
|
|
@app.route("/stats/status")
|
|
def stats_status():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
return jsonify({"ready": STATS_FILE.is_file(),
|
|
"generating": _gen_state["generating"]})
|
|
|
|
|
|
# ── Fichiers bruts (aperçu / téléchargement) ──────────────────────────
|
|
|
|
@app.route("/raw/<path:subpath>")
|
|
def raw_file(subpath):
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
abort(404)
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
return send_from_directory(ASSETS_ROOT, subpath)
|
|
|
|
|
|
# ── Suppression de fichiers ───────────────────────────────────────────
|
|
|
|
@app.route("/delete", methods=["POST"])
|
|
def delete_file():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
subpath = request.form.get("path", "").strip()
|
|
if not subpath:
|
|
abort(400)
|
|
target = _safe_path(subpath)
|
|
if not target.exists():
|
|
abort(404)
|
|
if target.is_dir():
|
|
abort(400)
|
|
top = Path(subpath).parts[0]
|
|
if top in _HIDDEN or top.startswith("."):
|
|
abort(403)
|
|
parent = str(Path(subpath).parent)
|
|
target.unlink()
|
|
return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse"))
|
|
|
|
|
|
# ── Upload de fichiers ────────────────────────────────────────────────
|
|
|
|
@app.route("/upload", methods=["POST"])
|
|
def upload_file():
|
|
redir = _require_admin()
|
|
if redir:
|
|
return redir
|
|
|
|
subpath = request.form.get("path", "").strip()
|
|
files = request.files.getlist("files")
|
|
|
|
if not files or all(f.filename == "" for f in files):
|
|
abort(400)
|
|
|
|
if subpath:
|
|
parts = Path(subpath).parts
|
|
if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")):
|
|
abort(403)
|
|
|
|
dest = _safe_path(subpath) if subpath else ASSETS_ROOT
|
|
if not dest.is_dir():
|
|
abort(400)
|
|
|
|
for f in files:
|
|
name = secure_filename(f.filename or "")
|
|
if not name:
|
|
continue
|
|
f.save(dest / name)
|
|
|
|
return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse"))
|
|
|
|
|
|
# ── Fichiers CDN publics (dev local uniquement) ───────────────────────
|
|
|
|
_PUBLIC_TOP = frozenset({"logo", "wiki", "error"})
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return send_from_directory(ASSETS_ROOT, "favicon.ico")
|
|
|
|
|
|
@app.route("/robots.txt")
|
|
def robots():
|
|
return send_from_directory(ASSETS_ROOT, "robots.txt")
|
|
|
|
|
|
@app.route("/<path:subpath>")
|
|
def cdn_file(subpath):
|
|
top = Path(subpath).parts[0]
|
|
if top not in _PUBLIC_TOP:
|
|
abort(404)
|
|
target = _safe_path(subpath)
|
|
if not target.is_file():
|
|
abort(404)
|
|
return send_from_directory(ASSETS_ROOT, subpath)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, port=5003)
|