import os import re import json import tempfile import logging from fastapi import UploadFile, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from docling.document_converter import DocumentConverter, PdfFormatOption, ImageFormatOption from docling.datamodel.pipeline_options import PdfPipelineOptions, TesseractCliOcrOptions from docling.datamodel.base_models import InputFormat from app.models.ConvertModel import Conversion logger = logging.getLogger(__name__) import openai as _openai OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL") or None OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llava") CLEANUP_MODEL = os.getenv("CLEANUP_MODEL", "") def _build_converter() -> DocumentConverter: try: ocr_opts = TesseractCliOcrOptions() pdf_opts = PdfPipelineOptions(do_ocr=True, ocr_options=ocr_opts) logger.info("Docling: OCR enabled via Tesseract CLI") except Exception as e: logger.warning("Docling: Tesseract unavailable (%s) — OCR disabled", e) pdf_opts = PdfPipelineOptions(do_ocr=False) # ImageFormatOption also uses StandardPdfPipeline — pass same pdf_opts # to prevent docling from falling back to RapidOCR / PP-OCRv6 return DocumentConverter(format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=pdf_opts), InputFormat.IMAGE: ImageFormatOption(pipeline_options=pdf_opts), }) converter = _build_converter() _llm_client = None LLM_ACTIVE = False def _init_llm(base_url: str | None, model: str) -> bool: global OLLAMA_BASE_URL, OLLAMA_MODEL, LLM_ACTIVE, _llm_client if not base_url: OLLAMA_BASE_URL, OLLAMA_MODEL, LLM_ACTIVE, _llm_client = None, model, False, None return False try: client = _openai.OpenAI(base_url=base_url, api_key="ollama") OLLAMA_BASE_URL = base_url OLLAMA_MODEL = model _llm_client = client LLM_ACTIVE = True logger.info("Docling: LLM enabled via %s (model=%s)", base_url, model) return True except Exception as e: logger.warning("Docling: LLM init failed (%s)", e) LLM_ACTIVE = False return False _init_llm(OLLAMA_BASE_URL, OLLAMA_MODEL) DEFAULT_ENRICH_PROMPT = ( "You are a document cleaning assistant. " "Fix OCR errors, normalise whitespace, and improve the Markdown structure. " "Return ONLY the raw Markdown text — no code fences, no commentary, no explanation." ) def _llm_enrich(markdown: str, system_prompt: str | None = None) -> str: """Send extracted markdown to LLM for cleanup. Optionally override the system prompt.""" if not _llm_client or not markdown.strip(): return markdown try: resp = _llm_client.chat.completions.create( model=OLLAMA_MODEL, messages=[ {"role": "system", "content": system_prompt or DEFAULT_ENRICH_PROMPT}, {"role": "user", "content": markdown}, ], temperature=0, ) result = resp.choices[0].message.content or markdown # llava tends to wrap output in code fences regardless of instructions — strip them result = re.sub(r"^```(?:markdown)?\s*\n?", "", result.strip()) result = re.sub(r"\n?```\s*$", "", result.strip()) return result.strip() or markdown except Exception as e: logger.warning("Docling: LLM enrichment failed (%s) — returning raw output", e) return markdown async def convert_url( url: str, db: AsyncSession, output_format: str = "markdown", use_llm: bool = True, llm_prompt: str | None = None, ) -> "Conversion": """Fetch a YouTube (or any URL) transcript via yt-dlp, then convert with Docling.""" try: import yt_dlp # noqa: PLC0415 except ImportError: raise HTTPException(status_code=500, detail="yt-dlp not installed") ydl_opts = { "quiet": True, "skip_download": True, "writesubtitles": True, "writeautomaticsub": True, "subtitleslangs": ["vi", "en"], "outtmpl": "%(id)s.%(ext)s", } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) except Exception as e: raise HTTPException(status_code=422, detail=f"yt-dlp error: {e}") title = info.get("title", "YouTube Video") description = info.get("description", "") or "" channel = info.get("channel", info.get("uploader", "")) duration = info.get("duration_string", "") upload_date = info.get("upload_date", "") view_count = info.get("view_count") chapters = info.get("chapters") or [] # Build markdown from available metadata lines = [f"# {title}", ""] meta_rows = [] if channel: meta_rows.append(f"**Kênh:** {channel}") if duration: meta_rows.append(f"**Thời lượng:** {duration}") if upload_date and len(upload_date) == 8: meta_rows.append(f"**Ngày đăng:** {upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}") if view_count is not None: meta_rows.append(f"**Lượt xem:** {view_count:,}") meta_rows.append(f"**URL:** {url}") lines.extend(meta_rows) lines.append("") # Subtitles/transcript subtitles = info.get("subtitles") or {} auto_subtitles = info.get("automatic_captions") or {} transcript_text = None for lang in ("vi", "en"): tracks = subtitles.get(lang) or auto_subtitles.get(lang) if tracks: # Find a json3 or srv3 track to extract plain text for track in tracks: if track.get("ext") in ("json3", "srv3", "ttml", "vtt"): try: import urllib.request with urllib.request.urlopen(track["url"], timeout=15) as r: raw = r.read().decode("utf-8", errors="ignore") # Strip VTT timestamps for vtt format if track.get("ext") == "vtt": cleaned = re.sub(r"\d{2}:\d{2}:\d{2}\.\d+ --> .*", "", raw) cleaned = re.sub(r"^\d+$", "", cleaned, flags=re.MULTILINE) cleaned = re.sub(r"<[^>]+>", "", cleaned) transcript_text = re.sub(r"\n{3,}", "\n\n", cleaned).strip() else: transcript_text = raw break except Exception: pass if transcript_text: break if transcript_text: lines += ["## Transcript / Phụ đề", "", transcript_text, ""] elif description: lines += ["## Mô tả", "", description[:3000], ""] else: lines += ["## Ghi chú", "", "_Không có transcript hoặc mô tả._", ""] if chapters: lines += ["## Chapters", ""] for ch in chapters: start = ch.get("start_time", 0) m, s = divmod(int(start), 60) lines.append(f"- **{m:02d}:{s:02d}** — {ch.get('title', '')}") lines.append("") markdown_text = "\n".join(lines) # Write temp file and run through Docling video_id = info.get("id", "youtube") with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as tmp: tmp.write(markdown_text) tmp_path = tmp.name try: result = converter.convert(tmp_path) doc = result.document page_count = None if output_format == "markdown": content = doc.export_to_markdown() elif output_format == "json": content = json.dumps(doc.export_to_dict(), ensure_ascii=False, indent=2) elif output_format == "html": content = doc.export_to_html() else: content = markdown_text llm_used = False if _llm_client and use_llm and output_format in ("markdown", "text"): content = _llm_enrich(content, system_prompt=llm_prompt or None) llm_used = True from app.models.ConvertModel import Conversion record = Conversion( filename=f"{video_id}.md", file_type="youtube", output_format=output_format, content=content, page_count=page_count, llm_enabled=llm_used, ) db.add(record) await db.commit() await db.refresh(record) return record except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=str(e)) finally: os.unlink(tmp_path) # ----------------------------------------------------------------- ALLOWED_EXTENSIONS = { "pdf", "docx", "xlsx", "pptx", "html", "htm", "jpg", "jpeg", "png", "tiff", "tif", "bmp", "md", "txt", "asciidoc", "adoc" } OUTPUT_FORMATS = {"markdown", "json", "html", "text"} def _allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS async def convert_file( file: UploadFile, db: AsyncSession, output_format: str = "markdown", use_llm: bool = True, llm_prompt: str | None = None, ) -> Conversion: if not _allowed_file(file.filename): raise HTTPException( status_code=422, detail=f"File type not allowed. Allowed: {', '.join(sorted(ALLOWED_EXTENSIONS))}" ) if output_format not in OUTPUT_FORMATS: raise HTTPException( status_code=422, detail=f"Output format not supported. Supported: {', '.join(sorted(OUTPUT_FORMATS))}" ) suffix = os.path.splitext(file.filename)[1] file_type = suffix.lstrip(".").lower() with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(await file.read()) tmp_path = tmp.name try: result = converter.convert(tmp_path) doc = result.document page_count = len(doc.pages) if hasattr(doc, "pages") and doc.pages else None if output_format == "markdown": content = doc.export_to_markdown() elif output_format == "json": content = json.dumps(doc.export_to_dict(), ensure_ascii=False, indent=2) elif output_format == "html": content = doc.export_to_html() elif output_format == "text": content = doc.export_to_markdown() content = re.sub(r"#{1,6}\s?", "", content) content = re.sub(r"\*\*(.+?)\*\*", r"\1", content) content = re.sub(r"\*(.+?)\*", r"\1", content) # LLM enrichment — only for markdown / text output, and only if requested llm_used = False if _llm_client and use_llm and output_format in ("markdown", "text"): content = _llm_enrich(content, system_prompt=llm_prompt or None) llm_used = True record = Conversion( filename=file.filename, file_type=file_type, output_format=output_format, content=content, page_count=page_count, llm_enabled=llm_used, ) db.add(record) await db.commit() await db.refresh(record) return record except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=str(e)) finally: os.unlink(tmp_path) async def get_conversion(conversion_id: int, db: AsyncSession) -> Conversion: result = await db.execute(select(Conversion).where(Conversion.id == conversion_id)) record = result.scalar_one_or_none() if not record: raise HTTPException(status_code=404, detail="Conversion not found") return record async def get_history(db: AsyncSession, limit: int = 20) -> list[Conversion]: result = await db.execute( select(Conversion).order_by(Conversion.created_at.desc()).limit(limit) ) return result.scalars().all() async def delete_conversion(conversion_id: int, db: AsyncSession) -> dict: result = await db.execute(select(Conversion).where(Conversion.id == conversion_id)) record = result.scalar_one_or_none() if not record: raise HTTPException(status_code=404, detail="Conversion not found") await db.delete(record) await db.commit() return {"message": f"Conversion {conversion_id} deleted"}