import json import os import subprocess import threading from pathlib import Path from datetime import datetime from flask import (Flask, redirect, url_for, session, request, render_template, abort, send_from_directory, jsonify) from authlib.integrations.flask_client import OAuth from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.environ["SECRET_KEY"] app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) oauth = OAuth(app) oauth.register( name="alpid", server_metadata_url=os.environ["ALPID_DISCOVERY_URL"], client_id=os.environ["ALPID_CLIENT_ID"], client_secret=os.environ["ALPID_CLIENT_SECRET"], client_kwargs={"scope": "openid profile email"}, ) ADMIN_GROUPS = set(os.environ.get("ADMIN_GROUPS", "admins").split(",")) ADMIN_EMAILS = set(e.strip() for e in os.environ.get("ADMIN_EMAILS", "").split(",") if e.strip()) ASSETS_ROOT = Path(os.environ.get("ASSETS_ROOT", ".")).resolve() STATS_FILE = Path(os.environ.get("STATS_FILE", "/opt/static-cdn/goaccess.html")) STATS_JSON = Path(os.environ.get("STATS_JSON", "/opt/static-cdn/goaccess.json")) STATS_LOG_FILE = os.environ.get("STATS_LOG_FILE", "") STATS_GENERATE_CMD = os.environ.get("STATS_GENERATE_CMD", "") _gen_lock = threading.Lock() _gen_state = {"generating": False} _HIDDEN = frozenset({ ".git", "scripts", "app", ".env", ".env.example", ".gitignore", ".htaccess", ".htpasswd_stats", "standard_index.html", }) IMAGE_EXT = frozenset({".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp"}) TEXT_EXT = frozenset({".txt", ".html", ".htm", ".md", ".css", ".js", ".json", ".xml", ".conf", ".sh", ".robots", ".php"}) PDF_EXT = frozenset({".pdf"}) # ── Helpers ─────────────────────────────────────────────────────────── def _user(): return session.get("user") def _require_admin(): u = _user() if not u: session["next_url"] = request.url return redirect(url_for("login")) if not u.get("is_admin"): abort(403) return None def _safe_path(subpath: str) -> Path: target = (ASSETS_ROOT / subpath).resolve() if not target.is_relative_to(ASSETS_ROOT): abort(400) return target def _humansize(n: int) -> str: for unit in ("o", "Ko", "Mo", "Go"): if n < 1024: return f"{n:.0f} {unit}" n /= 1024 return f"{n:.1f} To" def _folder_stats(path: Path) -> dict: files, size = 0, 0 for f in path.rglob("*"): if not f.is_file(): continue if any(p in _HIDDEN or p.startswith(".") for p in f.relative_to(path).parts): continue files += 1 size += f.stat().st_size return {"files": files, "size": size} def _parse_date(s: str): if not s: return None try: return datetime.strptime(s, "%Y-%m-%d") except ValueError: return None def _load_hits() -> dict: """Parse GoAccess JSON report → {relative_path: hit_count}.""" if not STATS_JSON.is_file(): return {} try: data = json.loads(STATS_JSON.read_text()) hits = {} for item in data.get("requests", {}).get("data", []): url = item.get("data", "").lstrip("/") count = item.get("hits", {}).get("count", 0) if url and count: hits[url] = count return hits except Exception: return {} def _entry(item: Path) -> dict: stat = item.stat() ext = item.suffix.lower() return { "name": item.name, "path": str(item.relative_to(ASSETS_ROOT)), "is_dir": item.is_dir(), "size": stat.st_size if item.is_file() else None, "mtime": datetime.fromtimestamp(stat.st_mtime), "ext": ext, "is_image": ext in IMAGE_EXT, "is_text": ext in TEXT_EXT, "is_pdf": ext in PDF_EXT, } # ── Auth ────────────────────────────────────────────────────────────── @app.route("/auth/login") def login(): return oauth.alpid.authorize_redirect(url_for("callback", _external=True)) @app.route("/auth/callback") def callback(): token = oauth.alpid.authorize_access_token() info = token.get("userinfo") or oauth.alpid.userinfo(token=token) email = info.get("email", "") groups = set(info.get("groups", [])) if groups: is_admin = bool(groups & ADMIN_GROUPS) elif ADMIN_EMAILS: is_admin = email in ADMIN_EMAILS else: is_admin = True session["user"] = { "sub": info["sub"], "name": info.get("name") or info.get("preferred_username", ""), "email": email, "is_admin": is_admin, } return redirect(session.pop("next_url", url_for("dashboard"))) @app.route("/auth/logout") def logout(): session.clear() return redirect(url_for("dashboard")) # ── Dashboard ───────────────────────────────────────────────────────── @app.route("/") def dashboard(): redir = _require_admin() if redir: return redir folders = {} for item in sorted(ASSETS_ROOT.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue if item.is_dir(): folders[item.name] = _folder_stats(item) return render_template("dashboard.html", user=_user(), folders=folders, total_files=sum(v["files"] for v in folders.values()), total_size=sum(v["size"] for v in folders.values()), humansize=_humansize, ) # ── Navigateur de fichiers ──────────────────────────────────────────── @app.route("/browse/") @app.route("/browse/") def browse(subpath=""): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.exists(): abort(404) if subpath: top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) if target.is_file(): return _file_preview(target, subpath) hits = _load_hits() entries = [] for item in sorted(target.iterdir()): if item.name in _HIDDEN or item.name.startswith("."): continue e = _entry(item) e["hits"] = hits.get(e["path"], 0) if not e["is_dir"] else None entries.append(e) entries.sort(key=lambda e: (0 if e["is_dir"] else 1, e["name"].lower())) parts = Path(subpath).parts if subpath else () breadcrumb = [ {"name": p, "path": str(Path(*parts[:i + 1]))} for i, p in enumerate(parts) ] return render_template("browse.html", user=_user(), entries=entries, subpath=subpath, breadcrumb=breadcrumb, humansize=_humansize, has_hits=STATS_JSON.is_file(), ) def _file_preview(path: Path, subpath: str): ext = path.suffix.lower() stat = path.stat() parent = str(Path(subpath).parent) if Path(subpath).parent != Path(".") else "" siblings = sorted( [f for f in path.parent.iterdir() if f.is_file() and f.name not in _HIDDEN and not f.name.startswith(".")], key=lambda f: f.name.lower(), ) idx = next((i for i, f in enumerate(siblings) if f == path), None) prev_path = str(siblings[idx - 1].relative_to(ASSETS_ROOT)) if idx else None next_path = str(siblings[idx + 1].relative_to(ASSETS_ROOT)) if idx is not None and idx < len(siblings) - 1 else None ctx = dict( user=_user(), subpath=subpath, filename=path.name, filesize=_humansize(stat.st_size), mtime=datetime.fromtimestamp(stat.st_mtime), raw_url=url_for("raw_file", subpath=subpath), parent_path=parent, prev_path=prev_path, next_path=next_path, sibling_pos=f"{idx + 1}/{len(siblings)}" if idx is not None else "", ) if ext in IMAGE_EXT: return render_template("preview_image.html", **ctx) if ext in TEXT_EXT: content = path.read_text(errors="replace") return render_template("preview_text.html", content=content, lang=ext.lstrip("."), **ctx) return render_template("preview_other.html", **ctx) # ── Recherche ───────────────────────────────────────────────────────── @app.route("/search") def search(): redir = _require_admin() if redir: return redir q = request.args.get("q", "").strip() after_s = request.args.get("after", "").strip() before_s = request.args.get("before", "").strip() in_content = request.args.get("content") == "1" dt_after = _parse_date(after_s) dt_before = _parse_date(before_s) results = [] searched = bool(q or dt_after or dt_before) if searched: q_low = q.lower() for path in sorted(ASSETS_ROOT.rglob("*")): if not path.is_file(): continue parts = path.relative_to(ASSETS_ROOT).parts if any(p in _HIDDEN or p.startswith(".") for p in parts): continue stat = path.stat() mtime = datetime.fromtimestamp(stat.st_mtime) if dt_after and mtime.date() < dt_after.date(): continue if dt_before and mtime.date() > dt_before.date(): continue name_match = (not q) or q_low in path.name.lower() content_match = None if in_content and q and not name_match and path.suffix.lower() in TEXT_EXT: try: text = path.read_text(errors="replace") idx = text.lower().find(q_low) if idx != -1: start = max(0, idx - 60) end = min(len(text), idx + len(q) + 60) snippet = text[start:end].replace("\n", " ") content_match = ("…" if start else "") + snippet + ("…" if end < len(text) else "") name_match = True except Exception: pass if not name_match: continue e = _entry(path) e["content_match"] = content_match results.append(e) return render_template("search.html", user=_user(), q=q, after=after_s, before=before_s, in_content=in_content, results=results, searched=searched, humansize=_humansize, ) # ── Statistiques GoAccess ───────────────────────────────────────────── @app.route("/stats/") def stats(): redir = _require_admin() if redir: return redir return render_template("stats.html", user=_user(), has_report=STATS_FILE.is_file(), ) @app.route("/stats/report") def stats_report(): redir = _require_admin() if redir: return redir if not STATS_FILE.is_file(): abort(404) return send_from_directory(STATS_FILE.parent, STATS_FILE.name) def _do_generate(): try: if STATS_GENERATE_CMD: subprocess.run(STATS_GENERATE_CMD, shell=True, timeout=600) else: cmd = ["goaccess", STATS_LOG_FILE, "--log-format=COMBINED", f"--output={STATS_FILE}"] if STATS_JSON: cmd.append(f"--output={STATS_JSON}") subprocess.run(cmd, timeout=600, check=False) finally: with _gen_lock: _gen_state["generating"] = False @app.route("/stats/generate", methods=["POST"]) def stats_generate(): redir = _require_admin() if redir: return redir if STATS_FILE.is_file(): return jsonify({"status": "exists"}) with _gen_lock: if _gen_state["generating"]: return jsonify({"status": "generating"}) if not STATS_LOG_FILE and not STATS_GENERATE_CMD: return jsonify({"status": "error", "message": "STATS_LOG_FILE non configuré"}), 500 _gen_state["generating"] = True threading.Thread(target=_do_generate, daemon=True).start() return jsonify({"status": "started"}) @app.route("/stats/status") def stats_status(): redir = _require_admin() if redir: return redir return jsonify({"ready": STATS_FILE.is_file(), "generating": _gen_state["generating"]}) # ── Fichiers bruts (aperçu / téléchargement) ────────────────────────── @app.route("/raw/") def raw_file(subpath): redir = _require_admin() if redir: return redir target = _safe_path(subpath) if not target.is_file(): abort(404) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) return send_from_directory(ASSETS_ROOT, subpath) # ── Suppression de fichiers ─────────────────────────────────────────── @app.route("/delete", methods=["POST"]) def delete_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() if not subpath: abort(400) target = _safe_path(subpath) if not target.exists(): abort(404) if target.is_dir(): abort(400) top = Path(subpath).parts[0] if top in _HIDDEN or top.startswith("."): abort(403) parent = str(Path(subpath).parent) target.unlink() return redirect(url_for("browse", subpath=parent) if parent != "." else url_for("browse")) # ── Upload de fichiers ──────────────────────────────────────────────── @app.route("/upload", methods=["POST"]) def upload_file(): redir = _require_admin() if redir: return redir subpath = request.form.get("path", "").strip() files = request.files.getlist("files") if not files or all(f.filename == "" for f in files): abort(400) if subpath: parts = Path(subpath).parts if parts and (parts[0] in _HIDDEN or parts[0].startswith(".")): abort(403) dest = _safe_path(subpath) if subpath else ASSETS_ROOT if not dest.is_dir(): abort(400) for f in files: name = secure_filename(f.filename or "") if not name: continue f.save(dest / name) return redirect(url_for("browse", subpath=subpath) if subpath else url_for("browse")) # ── Fichiers CDN publics (dev local uniquement) ─────────────────────── _PUBLIC_TOP = frozenset({"logo", "wiki", "error"}) @app.route("/favicon.ico") def favicon(): return send_from_directory(ASSETS_ROOT, "favicon.ico") @app.route("/robots.txt") def robots(): return send_from_directory(ASSETS_ROOT, "robots.txt") @app.route("/") def cdn_file(subpath): top = Path(subpath).parts[0] if top not in _PUBLIC_TOP: abort(404) target = _safe_path(subpath) if not target.is_file(): abort(404) return send_from_directory(ASSETS_ROOT, subpath) if __name__ == "__main__": app.run(debug=True, port=5003)