Назад
Системный редактор
main.py
•
/srv/myhost/app/main.py
Tab
Поиск
Копировать
Вставить
0/0
import os import re import mimetypes import sqlite3 import shutil import secrets import zipfile import subprocess from datetime import datetime, timezone from pathlib import Path from urllib.parse import quote from fastapi import FastAPI, Request, UploadFile, File, Form, HTTPException, Cookie from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from passlib.context import CryptContext BASE_DIR = Path("/srv/myhost") FILES_DIR = BASE_DIR / "files" DATA_DIR = BASE_DIR / "data" DB_PATH = DATA_DIR / "app.db" BACKUP_FILES_DIR = BASE_DIR / "backups" / "files" BACKUP_SYSTEM_DIR = BASE_DIR / "backups" / "system" FILES_DIR.mkdir(parents=True, exist_ok=True) DATA_DIR.mkdir(parents=True, exist_ok=True) BACKUP_FILES_DIR.mkdir(parents=True, exist_ok=True) BACKUP_SYSTEM_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI() app.mount("/static", StaticFiles(directory=str(BASE_DIR / "app" / "static")), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "app" / "templates")) SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}[a-z0-9]$|^[a-z0-9]$") SAFE_NAME_RE = re.compile(r"^[A-Za-z0-9._ -]{1,255}$") PANEL_DOMAIN = os.getenv("PANEL_DOMAIN", "panel.nothalk.fun") FILES_DOMAIN = os.getenv("FILES_DOMAIN", "files.nothalk.fun") IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".svg"} VIDEO_EXTS = {".mp4", ".webm", ".mov", ".m4v"} TEXT_EXTS = { ".txt", ".md", ".json", ".js", ".css", ".html", ".htm", ".xml", ".yaml", ".yml", ".py", ".ini", ".conf", ".log", ".env", ".csv", ".toml", ".sh" } SORT_MAP = { "new": "id DESC", "old": "id ASC", "name_asc": "original_name COLLATE NOCASE ASC", "name_desc": "original_name COLLATE NOCASE DESC", "size_asc": "size ASC", "size_desc": "size DESC", "slug_asc": "slug COLLATE NOCASE ASC", "slug_desc": "slug COLLATE NOCASE DESC", } SYSTEM_FILES = { "main.py": "/srv/myhost/app/main.py", "index.html": "/srv/myhost/app/templates/index.html", "app.css": "/srv/myhost/app/static/app.css", "system.html": "/srv/myhost/app/templates/system.html", "system_edit.html": "/srv/myhost/app/templates/system_edit.html", "file_edit.html": "/srv/myhost/app/templates/file_edit.html", "myhost.service": "/etc/systemd/system/myhost.service", "nginx_myhost": "/etc/nginx/sites-available/myhost", "404": "/srv/myhost/errors/404.html", "ForgeLane": "/srv/myhost/app/templates/forge_lane.html", "ForgeRepo": "/srv/myhost/app/templates/forge_repo.html", "ForgeMarks": "/srv/myhost/app/templates/forge_marks.html", "ForgeMark": "/srv/myhost/app/templates/forge_mark.html", "ForgeDrops": "/srv/myhost/app/templates/forge_drops.html", "ForgeDrop": "/srv/myhost/app/templates/forge_drop.html", "ForgeLogin": "/srv/myhost/app/templates/forge_login.html", "ForgeRegister": "/srv/myhost/app/templates/forge_register.html", "ForgeProfile": "/srv/myhost/app/templates/forge_profile.html", "ForgeManageRepo": "/srv/myhost/app/templates/forge_manage_repo.html", } pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") def db_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def now_iso(): return datetime.now(timezone.utc).isoformat() def ts_slug(): return datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") def slugify(value: str) -> str: value = (value or "").strip().lower() value = re.sub(r"[ _]+", "-", value) value = re.sub(r"[^a-z0-9-]", "", value) value = re.sub(r"-{2,}", "-", value).strip("-") return (value or "item")[:64].strip("-") or "item" def valid_slug(slug: str) -> bool: return bool(SLUG_RE.fullmatch(slug)) def safe_label(value: str) -> str: value = (value or "").strip() value = re.sub(r"\s+", "-", value) value = re.sub(r"[^A-Za-z0-9._-]", "", value) return value[:80] or "manual" def split_filename(filename: str): p = Path(filename) return p.stem, p.suffix def ensure_unique_slug(conn, table: str, base_slug: str, exclude_id=None) -> str: slug = slugify(base_slug) if not valid_slug(slug): slug = "item" attempt = slug counter = 1 while True: row = conn.execute(f"SELECT id FROM {table} WHERE slug = ?", (attempt,)).fetchone() if row is None or (exclude_id is not None and row["id"] == exclude_id): return attempt counter += 1 suffix = f"-{counter}" max_base = 64 - len(suffix) attempt = f"{slug[:max_base].rstrip('-')}{suffix}" def folder_chain_public(conn, folder_id): while folder_id: row = conn.execute("SELECT id, parent_id, is_public FROM folders WHERE id = ?", (folder_id,)).fetchone() if not row: return True if int(row["is_public"]) != 1: return False folder_id = row["parent_id"] return True def file_is_effectively_public(conn, file_row): return int(file_row["is_public"]) == 1 and folder_chain_public(conn, file_row["folder_id"]) def get_folder(conn, folder_id): if not folder_id: return None row = conn.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)).fetchone() return dict(row) if row else None def get_breadcrumbs(conn, folder_id): chain = [] while folder_id: row = conn.execute("SELECT id, name, parent_id FROM folders WHERE id = ?", (folder_id,)).fetchone() if not row: break chain.append(dict(row)) folder_id = row["parent_id"] return list(reversed(chain)) def format_bytes(num: int) -> str: value = float(num) for unit in ["B", "KB", "MB", "GB", "TB"]: if value < 1024 or unit == "TB": return f"{value:.2f} {unit}" value /= 1024 return f"{num} B" def get_forge_repo_by_owner_and_slug(conn, username: str, repo_slug: str): return conn.execute(""" SELECT r.*, u.id AS user_id, u.username, u.display_name, u.bio FROM forge_repos r JOIN forge_users u ON u.id = r.owner_id WHERE u.username = ? AND r.slug = ? LIMIT 1 """, (username, repo_slug)).fetchone() def get_forge_lane(conn, repo_id: int, lane_slug: str): return conn.execute(""" SELECT * FROM forge_lanes WHERE repo_id = ? AND slug = ? LIMIT 1 """, (repo_id, lane_slug)).fetchone() def get_forge_mark(conn, repo_id: int, mark_slug: str): return conn.execute(""" SELECT * FROM forge_marks WHERE repo_id = ? AND slug = ? LIMIT 1 """, (repo_id, mark_slug)).fetchone() def hash_password(password: str) -> str: password = (password or "").strip() return pwd_context.hash(password) def verify_password(password: str, password_hash: str | None) -> bool: if not password_hash: return False password = (password or "").strip() return pwd_context.verify(password, password_hash) def make_session_token(user_id: int, username: str) -> str: raw = f"{user_id}:{username}:{secrets.token_urlsafe(24)}" return raw def get_forge_user_by_session(conn, session_token: str | None): if not session_token: return None parts = session_token.split(":") if len(parts) < 3: return None try: user_id = int(parts[0]) except ValueError: return None row = conn.execute(""" SELECT * FROM forge_users WHERE id = ? AND is_active = 1 LIMIT 1 """, (user_id,)).fetchone() return dict(row) if row else None def get_owned_repo_or_403(conn, user_id: int, repo_id: int): row = conn.execute(""" SELECT * FROM forge_repos WHERE id = ? AND owner_id = ? LIMIT 1 """, (repo_id, user_id)).fetchone() if not row: raise HTTPException(status_code=403, detail="Access denied") return dict(row) def ensure_unique_user_repo_slug(conn, owner_id: int, base_slug: str) -> str: slug = slugify(base_slug) if not valid_slug(slug): slug = "repo" attempt = slug counter = 1 while True: row = conn.execute(""" SELECT id FROM forge_repos WHERE owner_id = ? AND slug = ? LIMIT 1 """, (owner_id, attempt)).fetchone() if row is None: return attempt counter += 1 suffix = f"-{counter}" max_base = 64 - len(suffix) attempt = f"{slug[:max_base].rstrip('-')}{suffix}" def get_full_tree_and_files(conn): folders = conn.execute("SELECT * FROM folders ORDER BY name COLLATE NOCASE").fetchall() files = conn.execute("SELECT * FROM files ORDER BY original_name COLLATE NOCASE").fetchall() folders_dict = {f["id"]: dict(f) for f in folders} for fid in folders_dict: folders_dict[fid]["files"] = [] folders_dict[fid]["children"] = [] folders_dict[fid]["size"] = 0 root_folders = [] root_files = [] for f in files: fd = dict(f) fd["is_public_bool"] = bool(fd["is_public"]) fd["size_h"] = format_bytes(fd["size"]) if fd["folder_id"] and fd["folder_id"] in folders_dict: folders_dict[fd["folder_id"]]["files"].append(fd) folders_dict[fd["folder_id"]]["size"] += fd["size"] else: root_files.append(fd) for fid, f in folders_dict.items(): parent_id = f["parent_id"] if parent_id and parent_id in folders_dict: folders_dict[parent_id]["children"].append(f) else: root_folders.append(f) def calc_size_and_depth(node, depth=0): node["depth"] = depth total_size = sum(file["size"] for file in node["files"]) for child in node["children"]: total_size += calc_size_and_depth(child, depth + 1) node["size"] = total_size node["size_h"] = format_bytes(total_size) return total_size for f in root_folders: calc_size_and_depth(f) def flatten(nodes, out=None): if out is None: out = [] for node in nodes: out.append(node) flatten(node["children"], out) return out return flatten(root_folders), root_files def file_kind(filename: str): ext = Path(filename).suffix.lower() if ext in IMAGE_EXTS: return "image" if ext in VIDEO_EXTS: return "video" if ext in TEXT_EXTS: return "text" return "other" def is_text_filename(filename: str) -> bool: return Path(filename).suffix.lower() in TEXT_EXTS def is_image_filename(filename: str) -> bool: return Path(filename).suffix.lower() in IMAGE_EXTS def get_file_by_slug(slug: str): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE slug = ?", (slug,)).fetchone() if row: return dict(row), None hist = conn.execute(""" SELECT sh.old_slug, f.slug FROM slug_history sh JOIN files f ON f.id = sh.file_id WHERE sh.old_slug = ? """, (slug,)).fetchone() if hist: return None, hist["slug"] return None, None finally: conn.close() def save_uploaded_file(upload: UploadFile, file_path: Path): with file_path.open("wb") as buffer: shutil.copyfileobj(upload.file, buffer) def parse_ids(csv_ids: str) -> list[int]: return [int(x) for x in csv_ids.split(",") if x.strip().isdigit()] def manual_backup_file(src: Path, backup_dir: Path, prefix: str, label: str): backup_dir.mkdir(parents=True, exist_ok=True) backup_name = f"{prefix}-{ts_slug()}-{safe_label(label)}-{src.name}" dst = backup_dir / backup_name shutil.copy2(src, dst) return dst def get_system_backups(key: str): prefix = f"{key}-" items = [] for p in sorted(BACKUP_SYSTEM_DIR.glob(f"{prefix}*"), reverse=True): items.append({ "name": p.name, "size_h": format_bytes(p.stat().st_size), "mtime": datetime.fromtimestamp(p.stat().st_mtime).isoformat(timespec="seconds"), }) return items def get_file_backups(file_id: int): prefix = f"file-{file_id}-" items = [] for p in sorted(BACKUP_FILES_DIR.glob(f"{prefix}*"), reverse=True): items.append({ "name": p.name, "size_h": format_bytes(p.stat().st_size), "mtime": datetime.fromtimestamp(p.stat().st_mtime).isoformat(timespec="seconds"), }) return items def delete_file_backups(file_id: int): prefix = f"file-{file_id}-" for p in BACKUP_FILES_DIR.glob(f"{prefix}*"): try: p.unlink() except FileNotFoundError: pass def backups_total_size(): total = 0 for base in (BACKUP_FILES_DIR, BACKUP_SYSTEM_DIR): for p in base.rglob("*"): if p.is_file(): total += p.stat().st_size return total def system_reload(): subprocess.run(["systemctl", "daemon-reload"], check=False) subprocess.run(["nginx", "-t"], check=False) subprocess.run(["systemctl", "reload", "nginx"], check=False) subprocess.run(["systemctl", "restart", "myhost"], check=False) @app.get("/", response_class=HTMLResponse) def index(request: Request, folder_id: int | None = None, q: str = "", sort: str = "new"): conn = db_conn() try: current_folder = get_folder(conn, folder_id) breadcrumbs = get_breadcrumbs(conn, folder_id) q_clean = q.strip() sort_key = sort if sort in SORT_MAP else "new" order_sql = SORT_MAP[sort_key] params = [] search_sql = "" if q_clean: search_sql = " AND (original_name LIKE ? OR slug LIKE ?)" like = f"%{q_clean}%" params.extend([like, like]) subfolders = conn.execute(f""" SELECT * FROM folders WHERE ((? IS NULL AND parent_id IS NULL) OR parent_id = ?) ORDER BY name COLLATE NOCASE """, (folder_id, folder_id)).fetchall() files = conn.execute(f""" SELECT * FROM files WHERE ((? IS NULL AND folder_id IS NULL) OR folder_id = ?) {search_sql} ORDER BY {order_sql} """, (folder_id, folder_id, *params)).fetchall() all_folders_flat, root_files = get_full_tree_and_files(conn) folders_map = {f["id"]: f for f in all_folders_flat} if current_folder and current_folder["id"] in folders_map: current_folder["size"] = folders_map[current_folder["id"]]["size"] current_folder["size_h"] = folders_map[current_folder["id"]]["size_h"] folders_out = [] for row in subfolders: row = dict(row) row["is_public_bool"] = bool(row["is_public"]) if row["id"] in folders_map: row["size"] = folders_map[row["id"]]["size"] row["size_h"] = folders_map[row["id"]]["size_h"] else: row["size"] = 0 row["size_h"] = "0 B" folders_out.append(row) files_out = [] for row in files: row = dict(row) slug = row["slug"] row["view_url"] = f"https://{FILES_DOMAIN}/f/{slug}" row["download_url"] = f"https://{FILES_DOMAIN}/d/{slug}" row["is_public_bool"] = bool(row["is_public"]) row["kind"] = file_kind(row["original_name"]) row["is_text"] = row["kind"] == "text" row["is_image"] = row["kind"] == "image" row["size_h"] = format_bytes(row["size"]) files_out.append(row) total_db_size = conn.execute("SELECT COALESCE(SUM(size), 0) AS total FROM files").fetchone()["total"] backup_size = backups_total_size() usage = shutil.disk_usage(FILES_DIR) return templates.TemplateResponse("index.html", { "request": request, "folders": folders_out, "files": files_out, "all_folders": all_folders_flat, "root_files": root_files, "current_folder": current_folder, "folder_id": folder_id, "breadcrumbs": breadcrumbs, "panel_domain": PANEL_DOMAIN, "files_domain": FILES_DOMAIN, "query": q_clean, "sort": sort_key, "hosted_used_h": format_bytes(total_db_size), "backups_used_h": format_bytes(backup_size), "disk_total_h": format_bytes(usage.total), "disk_used_h": format_bytes(usage.used), "disk_free_h": format_bytes(usage.free), }) finally: conn.close() @app.get("/system", response_class=HTMLResponse) def system_page(request: Request): system_files = [] for key, path_str in SYSTEM_FILES.items(): p = Path(path_str) exists = p.exists() system_files.append({ "key": key, "path": path_str, "exists": exists, "size_h": format_bytes(p.stat().st_size) if exists else "-", "backups": get_system_backups(key), }) return templates.TemplateResponse("system.html", { "request": request, "system_files": system_files, }) @app.get("/system/edit/{key}", response_class=HTMLResponse) def system_edit(request: Request, key: str): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") path = Path(SYSTEM_FILES[key]) if not path.exists(): raise HTTPException(status_code=404, detail="System file missing") content = path.read_text(encoding="utf-8", errors="replace") return templates.TemplateResponse("system_edit.html", { "request": request, "key": key, "path": str(path), "content": content, "backups": get_system_backups(key), }) @app.post("/system/save/{key}") async def system_save(key: str, content: str = Form(...)): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") path = Path(SYSTEM_FILES[key]) if not path.exists(): raise HTTPException(status_code=404, detail="System file missing") path.write_text(content, encoding="utf-8") return RedirectResponse(url=f"/system/edit/{key}", status_code=303) @app.post("/system/create-backup/{key}") async def system_create_backup(key: str, label: str = Form(...)): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") path = Path(SYSTEM_FILES[key]) if not path.exists(): raise HTTPException(status_code=404, detail="System file missing") manual_backup_file(path, BACKUP_SYSTEM_DIR, key, label) return RedirectResponse(url=f"/system/edit/{key}", status_code=303) @app.post("/system/delete-backup/{key}") async def system_delete_backup(key: str, backup_name: str = Form(...)): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") backup_path = BACKUP_SYSTEM_DIR / Path(backup_name).name if not backup_path.exists(): raise HTTPException(status_code=404, detail="Backup not found") expected_prefix = f"{key}-" if not backup_path.name.startswith(expected_prefix): raise HTTPException(status_code=400, detail="Backup does not belong to this file") backup_path.unlink() return RedirectResponse(url=f"/system/edit/{key}", status_code=303) @app.post("/system/apply-restart/{key}") async def system_apply_restart(key: str, content: str = Form(...)): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") path = Path(SYSTEM_FILES[key]) if not path.exists(): raise HTTPException(status_code=404, detail="System file missing") path.write_text(content, encoding="utf-8") system_reload() return RedirectResponse(url=f"/system/edit/{key}", status_code=303) @app.post("/system/restore/{key}") async def system_restore(key: str, backup_name: str = Form(...)): if key not in SYSTEM_FILES: raise HTTPException(status_code=404, detail="System file not allowed") path = Path(SYSTEM_FILES[key]) backup_path = BACKUP_SYSTEM_DIR / Path(backup_name).name if not backup_path.exists(): raise HTTPException(status_code=404, detail="Backup not found") shutil.copy2(backup_path, path) system_reload() return RedirectResponse(url=f"/system/edit/{key}", status_code=303) @app.get("/files/edit/{file_id}", response_class=HTMLResponse) def edit_text_file(request: Request, file_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) if not is_text_filename(row["original_name"]): raise HTTPException(status_code=400, detail="Not a text file") path = FILES_DIR / row["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") content = path.read_text(encoding="utf-8", errors="replace") return templates.TemplateResponse("file_edit.html", { "request": request, "file": row, "content": content, "backups": get_file_backups(file_id), }) finally: conn.close() @app.post("/files/edit/{file_id}") async def save_text_file(file_id: int, content: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) if not is_text_filename(row["original_name"]): raise HTTPException(status_code=400, detail="Not a text file") path = FILES_DIR / row["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") path.write_text(content, encoding="utf-8") new_size = path.stat().st_size conn.execute("UPDATE files SET size = ?, updated_at = ? WHERE id = ?", (new_size, now_iso(), file_id)) conn.commit() return RedirectResponse(url=f"/files/edit/{file_id}", status_code=303) finally: conn.close() @app.post("/files/create-backup/{file_id}") async def create_file_backup(file_id: int, label: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) path = FILES_DIR / row["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") manual_backup_file(path, BACKUP_FILES_DIR, f"file-{file_id}", label) return RedirectResponse(url=f"/files/edit/{file_id}", status_code=303) finally: conn.close() @app.post("/files/delete-backup/{file_id}") async def delete_file_backup(file_id: int, backup_name: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") backup_path = BACKUP_FILES_DIR / Path(backup_name).name if not backup_path.exists(): raise HTTPException(status_code=404, detail="Backup not found") expected_prefix = f"file-{file_id}-" if not backup_path.name.startswith(expected_prefix): raise HTTPException(status_code=400, detail="Backup does not belong to this file") backup_path.unlink() return RedirectResponse(url=f"/files/edit/{file_id}", status_code=303) finally: conn.close() @app.post("/files/restore/{file_id}") async def restore_text_file(file_id: int, backup_name: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) path = FILES_DIR / row["stored_name"] backup_path = BACKUP_FILES_DIR / Path(backup_name).name if not backup_path.exists(): raise HTTPException(status_code=404, detail="Backup not found") shutil.copy2(backup_path, path) new_size = path.stat().st_size conn.execute("UPDATE files SET size = ?, updated_at = ? WHERE id = ?", (new_size, now_iso(), file_id)) conn.commit() return RedirectResponse(url=f"/files/edit/{file_id}", status_code=303) finally: conn.close() @app.post("/files/replace-image/{file_id}") async def replace_image(file_id: int, image: UploadFile = File(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) if not is_image_filename(row["original_name"]): raise HTTPException(status_code=400, detail="Not an image file") ext_old = Path(row["original_name"]).suffix.lower() ext_new = Path(image.filename or "").suffix.lower() if ext_new and ext_new != ext_old: raise HTTPException(status_code=400, detail="Image extension must match current file type") path = FILES_DIR / row["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") save_uploaded_file(image, path) mime_type = image.content_type or mimetypes.guess_type(row["original_name"])[0] or "application/octet-stream" size = path.stat().st_size conn.execute(""" UPDATE files SET mime_type = ?, size = ?, updated_at = ? WHERE id = ? """, (mime_type, size, now_iso(), file_id)) conn.commit() folder_id = row["folder_id"] target = f"/?folder_id={folder_id}" if folder_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/create") async def create_new_file( name: str = Form(...), content: str = Form(""), slug: str = Form(""), folder_id: str = Form("") ): clean_name = Path(name.strip()).name if not clean_name or not SAFE_NAME_RE.fullmatch(clean_name): raise HTTPException(status_code=400, detail="Invalid file name") ext = Path(clean_name).suffix.lower() if ext not in TEXT_EXTS: raise HTTPException(status_code=400, detail="Only text file creation is supported") folder = int(folder_id) if folder_id.strip() else None conn = db_conn() try: stem = Path(clean_name).stem desired_slug = slugify(slug) if slug.strip() else slugify(stem) final_slug = ensure_unique_slug(conn, "files", desired_slug) stored_name = f"{secrets.token_hex(8)}{ext}" file_path = FILES_DIR / stored_name file_path.write_text(content, encoding="utf-8") size = file_path.stat().st_size mime_type = mimetypes.guess_type(clean_name)[0] or "text/plain" ts = now_iso() conn.execute(""" INSERT INTO files (original_name, stored_name, slug, mime_type, size, created_at, updated_at, folder_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1) """, (clean_name, stored_name, final_slug, mime_type, size, ts, ts, folder)) conn.commit() target = f"/?folder_id={folder}" if folder else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/api/upload") async def api_upload(files: list[UploadFile] = File(...), slug: str = Form(""), folder_id: str = Form("")): folder = int(folder_id) if folder_id.strip() else None conn = db_conn() created = [] try: for idx, upload in enumerate(files): original_name = Path(upload.filename or "file").name stem, suffix = split_filename(original_name) desired_slug = slugify(slug) if slug.strip() and len(files) == 1 and idx == 0 else slugify(stem) final_slug = ensure_unique_slug(conn, "files", desired_slug) stored_name = f"{secrets.token_hex(8)}{suffix.lower()}" file_path = FILES_DIR / stored_name save_uploaded_file(upload, file_path) size = file_path.stat().st_size mime_type = upload.content_type or mimetypes.guess_type(original_name)[0] or "application/octet-stream" ts = now_iso() conn.execute(""" INSERT INTO files (original_name, stored_name, slug, mime_type, size, created_at, updated_at, folder_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1) """, (original_name, stored_name, final_slug, mime_type, size, ts, ts, folder)) created.append({"name": original_name, "slug": final_slug}) conn.commit() return JSONResponse({"ok": True, "created": created}) finally: conn.close() @app.post("/folders/create") async def create_folder(name: str = Form(...), parent_id: str = Form("")): conn = db_conn() try: parent = int(parent_id) if parent_id.strip() else None clean_name = name.strip() if not clean_name: raise HTTPException(status_code=400, detail="Folder name required") slug = ensure_unique_slug(conn, "folders", clean_name) ts = now_iso() conn.execute(""" INSERT INTO folders (name, slug, parent_id, is_public, created_at, updated_at) VALUES (?, ?, ?, 1, ?, ?) """, (clean_name, slug, parent, ts, ts)) conn.commit() target = f"/?folder_id={parent}" if parent else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/folders/rename/{folder_id}") async def rename_folder(folder_id: int, name: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Folder not found") clean_name = name.strip() if not clean_name: raise HTTPException(status_code=400, detail="Folder name required") conn.execute("UPDATE folders SET name = ?, updated_at = ? WHERE id = ?", (clean_name, now_iso(), folder_id)) conn.commit() parent_id = row["parent_id"] target = f"/?folder_id={parent_id}" if parent_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/folders/delete/{folder_id}") async def delete_folder(folder_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Folder not found") child_folder = conn.execute("SELECT 1 FROM folders WHERE parent_id = ? LIMIT 1", (folder_id,)).fetchone() child_file = conn.execute("SELECT 1 FROM files WHERE folder_id = ? LIMIT 1", (folder_id,)).fetchone() if child_folder or child_file: raise HTTPException(status_code=400, detail="Folder is not empty") parent_id = row["parent_id"] conn.execute("DELETE FROM folders WHERE id = ?", (folder_id,)) conn.commit() target = f"/?folder_id={parent_id}" if parent_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/folders/toggle/{folder_id}") async def toggle_folder_public(folder_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Folder not found") new_value = 0 if int(row["is_public"]) == 1 else 1 conn.execute("UPDATE folders SET is_public = ?, updated_at = ? WHERE id = ?", (new_value, now_iso(), folder_id)) conn.commit() parent_id = row["parent_id"] target = f"/?folder_id={parent_id}" if parent_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/rename/{file_id}") async def rename_file(file_id: int, name: str = Form(...)): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") clean_name = Path(name.strip()).name if not clean_name: raise HTTPException(status_code=400, detail="File name required") conn.execute("UPDATE files SET original_name = ?, updated_at = ? WHERE id = ?", (clean_name, now_iso(), file_id)) conn.commit() folder_id = row["folder_id"] target = f"/?folder_id={folder_id}" if folder_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/move/{file_id}") async def move_file(file_id: int, folder_id: str = Form("")): new_folder = int(folder_id) if folder_id.strip() else None conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") old_folder = row["folder_id"] conn.execute("UPDATE files SET folder_id = ?, updated_at = ? WHERE id = ?", (new_folder, now_iso(), file_id)) conn.commit() target_folder = new_folder if new_folder is not None else old_folder target = f"/?folder_id={target_folder}" if target_folder else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/bulk-move") async def bulk_move(file_ids: str = Form(...), folder_id: str = Form(""), current_folder_id: str = Form(""), q: str = Form(""), sort: str = Form("new")): ids = parse_ids(file_ids) if not ids: raise HTTPException(status_code=400, detail="No files selected") new_folder = int(folder_id) if folder_id.strip() else None conn = db_conn() try: placeholders = ",".join(["?"] * len(ids)) conn.execute(f"UPDATE files SET folder_id = ?, updated_at = ? WHERE id IN ({placeholders})", (new_folder, now_iso(), *ids)) conn.commit() finally: conn.close() params = [] if current_folder_id.strip(): params.append(f"folder_id={current_folder_id.strip()}") if q.strip(): params.append(f"q={quote(q.strip())}") if sort.strip(): params.append(f"sort={quote(sort.strip())}") return RedirectResponse(url="/" + ("?" + "&".join(params) if params else ""), status_code=303) @app.post("/files/bulk-delete") async def bulk_delete(file_ids: str = Form(...), current_folder_id: str = Form(""), q: str = Form(""), sort: str = Form("new")): ids = parse_ids(file_ids) if not ids: raise HTTPException(status_code=400, detail="No files selected") conn = db_conn() try: placeholders = ",".join(["?"] * len(ids)) rows = conn.execute(f"SELECT * FROM files WHERE id IN ({placeholders})", ids).fetchall() for row in rows: row = dict(row) file_path = FILES_DIR / row["stored_name"] conn.execute("DELETE FROM slug_history WHERE file_id = ?", (row["id"],)) conn.execute("DELETE FROM files WHERE id = ?", (row["id"],)) delete_file_backups(row["id"]) if file_path.exists(): file_path.unlink() conn.commit() finally: conn.close() params = [] if current_folder_id.strip(): params.append(f"folder_id={current_folder_id.strip()}") if q.strip(): params.append(f"q={quote(q.strip())}") if sort.strip(): params.append(f"sort={quote(sort.strip())}") return RedirectResponse(url="/" + ("?" + "&".join(params) if params else ""), status_code=303) @app.post("/files/bulk-zip") async def bulk_zip(file_ids: str = Form(...), folder_id: str = Form("")): ids = parse_ids(file_ids) if not ids: raise HTTPException(status_code=400, detail="No files selected") folder = int(folder_id) if folder_id.strip() else None conn = db_conn() try: placeholders = ",".join(["?"] * len(ids)) rows = conn.execute(f"SELECT * FROM files WHERE id IN ({placeholders})", ids).fetchall() if not rows: raise HTTPException(status_code=404, detail="Files not found") stored_name = f"{secrets.token_hex(8)}.zip" archive_name = f"selection-{secrets.token_hex(4)}.zip" archive_path = FILES_DIR / stored_name with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf: for row in rows: row = dict(row) src = FILES_DIR / row["stored_name"] if src.exists(): zf.write(src, arcname=row["original_name"]) size = archive_path.stat().st_size ts = now_iso() slug = ensure_unique_slug(conn, "files", Path(archive_name).stem) conn.execute(""" INSERT INTO files (original_name, stored_name, slug, mime_type, size, created_at, updated_at, folder_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1) """, (archive_name, stored_name, slug, "application/zip", size, ts, ts, folder)) conn.commit() target = f"/?folder_id={folder}" if folder else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/toggle/{file_id}") async def toggle_file_public(file_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") new_value = 0 if int(row["is_public"]) == 1 else 1 conn.execute("UPDATE files SET is_public = ?, updated_at = ? WHERE id = ?", (new_value, now_iso(), file_id)) conn.commit() folder_id = row["folder_id"] target = f"/?folder_id={folder_id}" if folder_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/slug/{file_id}") async def update_slug(file_id: int, slug: str = Form(...)): slug = slugify(slug) if not valid_slug(slug): raise HTTPException(status_code=400, detail="Invalid slug") conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) old_slug = row["slug"] new_slug = ensure_unique_slug(conn, "files", slug, exclude_id=file_id) if new_slug != old_slug: conn.execute(""" INSERT OR IGNORE INTO slug_history (file_id, old_slug, created_at) VALUES (?, ?, ?) """, (file_id, old_slug, now_iso())) conn.execute("UPDATE files SET slug = ?, updated_at = ? WHERE id = ?", (new_slug, now_iso(), file_id)) conn.commit() folder_id = row["folder_id"] target = f"/?folder_id={folder_id}" if folder_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/delete/{file_id}") async def delete_file(file_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) file_path = FILES_DIR / row["stored_name"] conn.execute("DELETE FROM slug_history WHERE file_id = ?", (file_id,)) conn.execute("DELETE FROM files WHERE id = ?", (file_id,)) conn.commit() delete_file_backups(file_id) if file_path.exists(): file_path.unlink() folder_id = row["folder_id"] target = f"/?folder_id={folder_id}" if folder_id else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/files/unzip/{file_id}") async def unzip_file(file_id: int): conn = db_conn() try: row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="File not found") row = dict(row) file_path = FILES_DIR / row["stored_name"] if not zipfile.is_zipfile(file_path): raise HTTPException(status_code=400, detail="Not a zip archive") target_folder_name = Path(row["original_name"]).stem folder_slug = ensure_unique_slug(conn, "folders", target_folder_name) ts = now_iso() conn.execute(""" INSERT INTO folders (name, slug, parent_id, is_public, created_at, updated_at) VALUES (?, ?, ?, 1, ?, ?) """, (target_folder_name, folder_slug, row["folder_id"], ts, ts)) new_folder_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] with zipfile.ZipFile(file_path, "r") as zf: for member in zf.infolist(): if member.is_dir(): continue member_path = Path(member.filename) if member_path.is_absolute() or ".." in member_path.parts: continue safe_name = member_path.name if not safe_name: continue extracted = zf.read(member) suffix = Path(safe_name).suffix.lower() stored_name = f"{secrets.token_hex(8)}{suffix}" out_path = FILES_DIR / stored_name out_path.write_bytes(extracted) mime_type = mimetypes.guess_type(safe_name)[0] or "application/octet-stream" size = out_path.stat().st_size file_slug = ensure_unique_slug(conn, "files", Path(safe_name).stem) conn.execute(""" INSERT INTO files (original_name, stored_name, slug, mime_type, size, created_at, updated_at, folder_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1) """, (safe_name, stored_name, file_slug, mime_type, size, ts, ts, new_folder_id)) conn.commit() parent_folder = row["folder_id"] target = f"/?folder_id={parent_folder}" if parent_folder else "/" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.post("/folders/zip/{folder_id}") async def zip_folder(folder_id: int): conn = db_conn() try: folder = conn.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)).fetchone() if not folder: raise HTTPException(status_code=404, detail="Folder not found") folder = dict(folder) files = conn.execute("SELECT * FROM files WHERE folder_id = ?", (folder_id,)).fetchall() archive_name = f"{folder['name']}.zip" stored_name = f"{secrets.token_hex(8)}.zip" archive_path = FILES_DIR / stored_name with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf: for f in files: f = dict(f) src = FILES_DIR / f["stored_name"] if src.exists(): zf.write(src, arcname=f["original_name"]) size = archive_path.stat().st_size slug = ensure_unique_slug(conn, "files", folder["slug"]) ts = now_iso() conn.execute(""" INSERT INTO files (original_name, stored_name, slug, mime_type, size, created_at, updated_at, folder_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1) """, (archive_name, stored_name, slug, "application/zip", size, ts, ts, folder["parent_id"])) conn.commit() target = f"/?folder_id={folder_id}" return RedirectResponse(url=target, status_code=303) finally: conn.close() @app.get("/login", response_class=HTMLResponse) def forge_login_page(request: Request, error: str = ""): return templates.TemplateResponse("forge_login.html", { "request": request, "error": error, }) @app.post("/login") async def forge_login(username: str = Form(...), password: str = Form(...)): conn = db_conn() try: row = conn.execute(""" SELECT * FROM forge_users WHERE username = ? AND is_active = 1 LIMIT 1 """, (username.strip(),)).fetchone() if not row: return RedirectResponse(url="/login?error=Неверный+логин+или+пароль", status_code=303) user = dict(row) if not verify_password(password, user.get("password_hash")): return RedirectResponse(url="/login?error=Неверный+логин+или+пароль", status_code=303) token = make_session_token(user["id"], user["username"]) response = RedirectResponse(url="/profile", status_code=303) response.set_cookie("forge_session", token, httponly=True, samesite="lax", secure=True, path="/") return response finally: conn.close() @app.get("/register", response_class=HTMLResponse) def forge_register_page(request: Request, error: str = ""): return templates.TemplateResponse("forge_register.html", { "request": request, "error": error, }) @app.post("/register") async def forge_register( code: str = Form(...), username: str = Form(...), display_name: str = Form(...), password: str = Form(...), password2: str = Form(...) ): username = username.strip().lower() display_name = display_name.strip() code = code.strip() if not username or not re.fullmatch(r"[a-z0-9][a-z0-9_-]{1,31}", username): return RedirectResponse(url="/register?error=Некорректный+username", status_code=303) if not display_name: return RedirectResponse(url="/register?error=Укажи+display+name", status_code=303) if password != password2: return RedirectResponse(url="/register?error=Пароли+не+совпадают", status_code=303) if len(password) < 6: return RedirectResponse(url="/register?error=Пароль+слишком+короткий", status_code=303) conn = db_conn() try: invite = conn.execute(""" SELECT * FROM forge_invites WHERE code = ? AND is_used = 0 LIMIT 1 """, (code,)).fetchone() if not invite: return RedirectResponse(url="/register?error=Неверный+или+использованный+код", status_code=303) existing = conn.execute(""" SELECT id FROM forge_users WHERE username = ? LIMIT 1 """, (username,)).fetchone() if existing: return RedirectResponse(url="/register?error=Username+уже+занят", status_code=303) hashed = hash_password(password) ts = now_iso() conn.execute(""" INSERT INTO forge_users (username, display_name, bio, created_at, password_hash, is_active, created_from_invite_id) VALUES (?, ?, '', ?, ?, 1, ?) """, (username, display_name, ts, hashed, invite["id"])) user_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.execute(""" UPDATE forge_invites SET is_used = 1, used_by_user_id = ? WHERE id = ? """, (user_id, invite["id"])) conn.commit() token = make_session_token(user_id, username) response = RedirectResponse(url="/profile", status_code=303) response.set_cookie("forge_session", token, httponly=True, samesite="lax", secure=True, path="/") return response finally: conn.close() @app.get("/logout") def forge_logout(): response = RedirectResponse(url="/login", status_code=303) response.delete_cookie("forge_session", path="/") return response @app.get("/profile", response_class=HTMLResponse) def forge_profile( request: Request, forge_session: str | None = Cookie(default=None), create_error: str = "" ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) repos = [ dict(x) for x in conn.execute(""" SELECT * FROM forge_repos WHERE owner_id = ? ORDER BY updated_at DESC, name COLLATE NOCASE """, (user["id"],)).fetchall() ] return templates.TemplateResponse("forge_profile.html", { "request": request, "user": user, "repos": repos, "create_error": create_error, }) finally: conn.close() @app.post("/profile/repo/create") async def forge_create_repo( forge_session: str | None = Cookie(default=None), name: str = Form(...), slug: str = Form(""), description: str = Form(""), is_public: str = Form("") ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) clean_name = name.strip() clean_description = description.strip() if not clean_name: return RedirectResponse(url="/profile?create_error=Укажи+название+репозитория", status_code=303) desired_slug = slug.strip() or clean_name final_slug = ensure_unique_user_repo_slug(conn, user["id"], desired_slug) ts = now_iso() public_value = 1 if is_public == "1" else 0 conn.execute(""" INSERT INTO forge_repos (owner_id, name, slug, description, is_public, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user["id"], clean_name, final_slug, clean_description, public_value, ts, ts)) repo_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.execute(""" INSERT INTO forge_lanes (repo_id, name, slug, description, is_default, created_at, updated_at) VALUES (?, ?, ?, ?, 1, ?, ?) """, (repo_id, "main", "main", "Default main lane", ts, ts)) lane_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.execute(""" UPDATE forge_repos SET default_lane_id = ?, updated_at = ? WHERE id = ? """, (lane_id, ts, repo_id)) conn.commit() return RedirectResponse(url="/profile", status_code=303) finally: conn.close() @app.get("/manage/repo/{repo_id}", response_class=HTMLResponse) def forge_manage_repo( request: Request, repo_id: int, forge_session: str | None = Cookie(default=None) ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) repo = get_owned_repo_or_403(conn, user["id"], repo_id) lanes = [ dict(x) for x in conn.execute(""" SELECT * FROM forge_lanes WHERE repo_id = ? ORDER BY is_default DESC, name COLLATE NOCASE """, (repo_id,)).fetchall() ] owner = { "id": user["id"], "username": user["username"], "display_name": user["display_name"], "bio": user["bio"], } return templates.TemplateResponse("forge_manage_repo.html", { "request": request, "repo": repo, "lanes": lanes, "owner": owner, }) finally: conn.close() @app.post("/manage/repo/{repo_id}/update") async def forge_update_repo( repo_id: int, forge_session: str | None = Cookie(default=None), name: str = Form(...), slug: str = Form(...), description: str = Form(""), is_public: str = Form("") ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) repo = get_owned_repo_or_403(conn, user["id"], repo_id) clean_name = name.strip() clean_slug = ensure_unique_user_repo_slug(conn, user["id"], slug.strip() or clean_name) clean_description = description.strip() public_value = 1 if is_public == "1" else 0 conn.execute(""" UPDATE forge_repos SET name = ?, slug = ?, description = ?, is_public = ?, updated_at = ? WHERE id = ? """, (clean_name, clean_slug, clean_description, public_value, now_iso(), repo_id)) conn.commit() return RedirectResponse(url=f"/manage/repo/{repo_id}", status_code=303) finally: conn.close() @app.post("/manage/repo/{repo_id}/lane/create") async def forge_create_lane( repo_id: int, forge_session: str | None = Cookie(default=None), name: str = Form(...), slug: str = Form(""), description: str = Form("") ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) repo = get_owned_repo_or_403(conn, user["id"], repo_id) clean_name = name.strip() clean_slug = slugify(slug.strip() or clean_name) if not valid_slug(clean_slug): clean_slug = "lane" counter = 1 base_slug = clean_slug while True: exists = conn.execute(""" SELECT id FROM forge_lanes WHERE repo_id = ? AND slug = ? LIMIT 1 """, (repo_id, clean_slug)).fetchone() if not exists: break counter += 1 suffix = f"-{counter}" clean_slug = f"{base_slug[:64-len(suffix)].rstrip('-')}{suffix}" conn.execute(""" INSERT INTO forge_lanes (repo_id, name, slug, description, is_default, created_at, updated_at) VALUES (?, ?, ?, ?, 0, ?, ?) """, (repo_id, clean_name, clean_slug, description.strip(), now_iso(), now_iso())) conn.commit() return RedirectResponse(url=f"/manage/repo/{repo_id}", status_code=303) finally: conn.close() @app.post("/manage/repo/{repo_id}/delete") async def forge_delete_repo( repo_id: int, forge_session: str | None = Cookie(default=None) ): conn = db_conn() try: user = get_forge_user_by_session(conn, forge_session) if not user: return RedirectResponse(url="/login", status_code=303) get_owned_repo_or_403(conn, user["id"], repo_id) conn.execute("DELETE FROM forge_repos WHERE id = ?", (repo_id,)) conn.commit() return RedirectResponse(url="/profile", status_code=303) finally: conn.close() @app.get("/repo/{username}/{repo_slug}", response_class=HTMLResponse) def forge_repo_page(request: Request, username: str, repo_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } default_lane = None if repo["default_lane_id"]: lane_row = conn.execute("SELECT * FROM forge_lanes WHERE id = ?", (repo["default_lane_id"],)).fetchone() if lane_row: default_lane = dict(lane_row) lanes = [ dict(x) for x in conn.execute(""" SELECT * FROM forge_lanes WHERE repo_id = ? ORDER BY is_default DESC, name COLLATE NOCASE """, (repo["id"],)).fetchall() ] return templates.TemplateResponse("forge_repo.html", { "request": request, "repo": repo, "owner": owner, "default_lane": default_lane, "lanes": lanes, }) finally: conn.close() @app.get("/lane/{username}/{repo_slug}/{lane_slug}", response_class=HTMLResponse) def forge_lane_page(request: Request, username: str, repo_slug: str, lane_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } lane_row = get_forge_lane(conn, repo["id"], lane_slug) if not lane_row: raise HTTPException(status_code=404, detail="Lane not found") lane = dict(lane_row) files = [] rows = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.lane_id = ? ORDER BY rf.path COLLATE NOCASE """, (repo["id"], lane["id"])).fetchall() for r in rows: item = dict(r) item["size_h"] = format_bytes(item["size"]) files.append(item) return templates.TemplateResponse("forge_lane.html", { "request": request, "repo": repo, "owner": owner, "lane": lane, "files": files, }) finally: conn.close() @app.get("/lane/{username}/{repo_slug}/{lane_slug}/f/{file_path:path}") def forge_lane_file_view(username: str, repo_slug: str, lane_slug: str, file_path: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) lane_row = get_forge_lane(conn, repo["id"], lane_slug) if not lane_row: raise HTTPException(status_code=404, detail="Lane not found") lane = dict(lane_row) file_row = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.lane_id = ? AND rf.path = ? LIMIT 1 """, (repo["id"], lane["id"], file_path)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = file_row["mime_type"] or "application/octet-stream" response.headers["Content-Disposition"] = f'inline; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/lane/{username}/{repo_slug}/{lane_slug}/d/{file_path:path}") def forge_lane_file_download(username: str, repo_slug: str, lane_slug: str, file_path: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) lane_row = get_forge_lane(conn, repo["id"], lane_slug) if not lane_row: raise HTTPException(status_code=404, detail="Lane not found") lane = dict(lane_row) file_row = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.lane_id = ? AND rf.path = ? LIMIT 1 """, (repo["id"], lane["id"], file_path)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = "application/octet-stream" response.headers["Content-Disposition"] = f'attachment; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/mark/{username}/{repo_slug}", response_class=HTMLResponse) def forge_marks_page(request: Request, username: str, repo_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } marks = [ dict(x) for x in conn.execute(""" SELECT * FROM forge_marks WHERE repo_id = ? ORDER BY created_at DESC """, (repo["id"],)).fetchall() ] return templates.TemplateResponse("forge_marks.html", { "request": request, "repo": repo, "owner": owner, "marks": marks, }) finally: conn.close() @app.get("/mark/{username}/{repo_slug}/{mark_slug}", response_class=HTMLResponse) def forge_mark_page(request: Request, username: str, repo_slug: str, mark_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } mark_row = get_forge_mark(conn, repo["id"], mark_slug) if not mark_row: raise HTTPException(status_code=404, detail="Mark not found") mark = dict(mark_row) files = [] rows = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.mark_id = ? ORDER BY rf.path COLLATE NOCASE """, (repo["id"], mark["id"])).fetchall() for r in rows: item = dict(r) item["size_h"] = format_bytes(item["size"]) files.append(item) return templates.TemplateResponse("forge_mark.html", { "request": request, "repo": repo, "owner": owner, "mark": mark, "files": files, }) finally: conn.close() @app.get("/mark/{username}/{repo_slug}/{mark_slug}/f/{file_path:path}") def forge_mark_file_view(username: str, repo_slug: str, mark_slug: str, file_path: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) mark_row = get_forge_mark(conn, repo["id"], mark_slug) if not mark_row: raise HTTPException(status_code=404, detail="Mark not found") mark = dict(mark_row) file_row = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.mark_id = ? AND rf.path = ? LIMIT 1 """, (repo["id"], mark["id"], file_path)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = file_row["mime_type"] or "application/octet-stream" response.headers["Content-Disposition"] = f'inline; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/mark/{username}/{repo_slug}/{mark_slug}/d/{file_path:path}") def forge_mark_file_download(username: str, repo_slug: str, mark_slug: str, file_path: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) mark_row = get_forge_mark(conn, repo["id"], mark_slug) if not mark_row: raise HTTPException(status_code=404, detail="Mark not found") mark = dict(mark_row) file_row = conn.execute(""" SELECT rf.path, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_repo_files rf JOIN forge_objects o ON o.id = rf.object_id WHERE rf.repo_id = ? AND rf.mark_id = ? AND rf.path = ? LIMIT 1 """, (repo["id"], mark["id"], file_path)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = "application/octet-stream" response.headers["Content-Disposition"] = f'attachment; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/drop/{username}/{repo_slug}", response_class=HTMLResponse) def forge_drops_page(request: Request, username: str, repo_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } drops = [ dict(x) for x in conn.execute(""" SELECT * FROM forge_drops WHERE repo_id = ? AND is_published = 1 ORDER BY created_at DESC """, (repo["id"],)).fetchall() ] return templates.TemplateResponse("forge_drops.html", { "request": request, "repo": repo, "owner": owner, "drops": drops, }) finally: conn.close() @app.get("/drop/{username}/{repo_slug}/{drop_slug}", response_class=HTMLResponse) def forge_drop_page(request: Request, username: str, repo_slug: str, drop_slug: str): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) owner = { "id": repo["user_id"], "username": repo["username"], "display_name": repo["display_name"], "bio": repo["bio"], } drop_row = conn.execute(""" SELECT * FROM forge_drops WHERE repo_id = ? AND slug = ? AND is_published = 1 LIMIT 1 """, (repo["id"], drop_slug)).fetchone() if not drop_row: raise HTTPException(status_code=404, detail="Drop not found") drop = dict(drop_row) mark = None if drop["mark_id"]: mark_row = conn.execute("SELECT * FROM forge_marks WHERE id = ?", (drop["mark_id"],)).fetchone() if mark_row: mark = dict(mark_row) files = [] rows = conn.execute(""" SELECT df.object_id, df.label, df.sort_order, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_drop_files df JOIN forge_objects o ON o.id = df.object_id WHERE df.drop_id = ? ORDER BY df.sort_order ASC, o.original_name COLLATE NOCASE """, (drop["id"],)).fetchall() for r in rows: item = dict(r) item["size_h"] = format_bytes(item["size"]) files.append(item) return templates.TemplateResponse("forge_drop.html", { "request": request, "repo": repo, "owner": owner, "drop": drop, "mark": mark, "files": files, }) finally: conn.close() @app.get("/drop/{username}/{repo_slug}/{drop_slug}/f/{object_id}") def forge_drop_file_view(username: str, repo_slug: str, drop_slug: str, object_id: int): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) drop_row = conn.execute(""" SELECT * FROM forge_drops WHERE repo_id = ? AND slug = ? AND is_published = 1 LIMIT 1 """, (repo["id"], drop_slug)).fetchone() if not drop_row: raise HTTPException(status_code=404, detail="Drop not found") drop = dict(drop_row) file_row = conn.execute(""" SELECT df.object_id, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_drop_files df JOIN forge_objects o ON o.id = df.object_id WHERE df.drop_id = ? AND df.object_id = ? LIMIT 1 """, (drop["id"], object_id)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = file_row["mime_type"] or "application/octet-stream" response.headers["Content-Disposition"] = f'inline; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/drop/{username}/{repo_slug}/{drop_slug}/d/{object_id}") def forge_drop_file_download(username: str, repo_slug: str, drop_slug: str, object_id: int): conn = db_conn() try: row = get_forge_repo_by_owner_and_slug(conn, username, repo_slug) if not row: raise HTTPException(status_code=404, detail="Repo not found") repo = dict(row) drop_row = conn.execute(""" SELECT * FROM forge_drops WHERE repo_id = ? AND slug = ? AND is_published = 1 LIMIT 1 """, (repo["id"], drop_slug)).fetchone() if not drop_row: raise HTTPException(status_code=404, detail="Drop not found") drop = dict(drop_row) file_row = conn.execute(""" SELECT df.object_id, o.original_name, o.stored_name, o.mime_type, o.size FROM forge_drop_files df JOIN forge_objects o ON o.id = df.object_id WHERE df.drop_id = ? AND df.object_id = ? LIMIT 1 """, (drop["id"], object_id)).fetchone() if not file_row: raise HTTPException(status_code=404, detail="File not found") file_row = dict(file_row) physical = FILES_DIR / file_row["stored_name"] if not physical.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{file_row['stored_name']}" response.headers["Content-Type"] = "application/octet-stream" response.headers["Content-Disposition"] = f'attachment; filename="{quote(file_row["original_name"])}"' return response finally: conn.close() @app.get("/f/{slug}") def view_file(slug: str): row, redirect_slug = get_file_by_slug(slug) if redirect_slug: return RedirectResponse(url=f"/f/{redirect_slug}", status_code=301) if not row: raise HTTPException(status_code=404, detail="File not found") conn = db_conn() try: if not file_is_effectively_public(conn, row): raise HTTPException(status_code=403, detail="File sharing disabled") finally: conn.close() file_path = FILES_DIR / row["stored_name"] if not file_path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{row['stored_name']}" response.headers["Content-Type"] = row["mime_type"] or "application/octet-stream" response.headers["Content-Disposition"] = f'inline; filename="{quote(row["original_name"])}"' return response @app.get("/d/{slug}") def download_file(slug: str): row, redirect_slug = get_file_by_slug(slug) if redirect_slug: return RedirectResponse(url=f"/d/{redirect_slug}", status_code=301) if not row: raise HTTPException(status_code=404, detail="File not found") conn = db_conn() try: if not file_is_effectively_public(conn, row): raise HTTPException(status_code=403, detail="File sharing disabled") finally: conn.close() file_path = FILES_DIR / row["stored_name"] if not file_path.exists(): raise HTTPException(status_code=404, detail="Physical file missing") response = Response() response.headers["X-Accel-Redirect"] = f"/_protected/{row['stored_name']}" response.headers["Content-Type"] = "application/octet-stream" response.headers["Content-Disposition"] = f'attachment; filename="{quote(row["original_name"])}"' return response
Сохранить
Применить и перезапустить
Ручной бэкап
Сохранить версию
Прошлые версии
Бэкапов пока нет
Создайте версию перед внесением изменений