#!/usr/bin/env python3 """ Export emails from emailfiles/cur to Markdown. Output: emailfiles/output// markitdown/body.md markitdown/.md docling/body.md docling/.md Image OCR: vision LLM (llama-server via SSH tunnel) → Tesseract fallback Tunnel: ssh -fNL 11436:127.0.0.1:11436 -J kai@192.168.171.2 root@172.25.57.233 """ import email, email.policy, http.client, json, logging, os, re, shutil import subprocess, sys, tempfile, uuid from pathlib import Path from tqdm import tqdm # .env loader _env = Path(__file__).parent / ".env" if _env.exists(): for _l in _env.read_text().splitlines(): _l = _l.strip() if _l and not _l.startswith("#") and "=" in _l: _k, _v = _l.split("=", 1) os.environ.setdefault(_k.strip(), _v.strip()) logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) _WORKING = os.environ.get("WORKING_DIR", "emails") MAILDIR = Path(__file__).parent / _WORKING / os.environ.get("WORKING_INPUT", "input") OUTPUT = Path(__file__).parent / _WORKING / os.environ.get("WORKING_OUTPUT", "output") MARKITDOWN_HOST = "localhost" MARKITDOWN_PORT = 8282 DOCLING_HOST = "localhost" DOCLING_PORT = 8383 DOCLING_CONTAINER = "ai-markdown-api-docling-1" API_TIMEOUT = 300 VISION_HOST = os.environ.get("VISION_HOST", "localhost") VISION_PORT = int(os.environ.get("VISION_PORT", "11436")) PADDLE_HOST = os.environ.get("PADDLE_HOST", "localhost") PADDLE_PORT = int(os.environ.get("PADDLE_PORT", "9999")) IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".tif", ".webp"} DOCLING_SUPPORTED = {".pdf", ".docx", ".xlsx", ".pptx", ".html", ".htm"} | IMAGE_EXTS # --------------------------------------------------------------------------- # Multipart POST helper # --------------------------------------------------------------------------- def _post(host, port, path, filepath, params="") -> str: boundary = uuid.uuid4().hex data = filepath.read_bytes() body = ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filepath.name}"\r\n' f"Content-Type: application/octet-stream\r\n\r\n" ).encode() + data + f"\r\n--{boundary}--\r\n".encode() url = path + (f"?{params}" if params else "") conn = http.client.HTTPConnection(host, port, timeout=API_TIMEOUT) try: conn.request("POST", url, body=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}", "Content-Length": str(len(body))}) resp = conn.getresponse() raw = resp.read() if resp.status != 200: return f"" d = json.loads(raw) return d.get("markdown") or d.get("content") or str(d) finally: conn.close() def convert_markitdown(src: Path) -> str: try: return _post(MARKITDOWN_HOST, MARKITDOWN_PORT, "/convert", src, "use_llm=true") except Exception as e: return f"" # --------------------------------------------------------------------------- # Vision LLM — llama-server via SSH tunnel # --------------------------------------------------------------------------- def _vision_available() -> bool: try: conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=3) conn.request("GET", "/health") conn.getresponse().read() conn.close() return True except Exception: return False def _vision_ocr(src: Path) -> str: """Send image to llama-server, return markdown. Returns '' on failure.""" import base64 from PIL import Image import io img = Image.open(src).convert("RGB") if img.width < 1200: scale = max(2, 1200 // img.width) img = img.resize((img.width * scale, img.height * scale), Image.LANCZOS) buf = io.BytesIO() img.save(buf, "JPEG", quality=92) b64 = base64.standard_b64encode(buf.getvalue()).decode() payload = json.dumps({ "messages": [{"role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}, {"type": "text", "text": ( "Read every cell in this table carefully. " "Output a Markdown table with ALL rows. " "Do not skip or duplicate any row. " "Return only the Markdown table." )}, ]}], "max_tokens": 700, "temperature": 0, }).encode() conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=120) try: conn.request("POST", "/v1/chat/completions", body=payload, headers={"Content-Type": "application/json", "Content-Length": str(len(payload))}) resp = conn.getresponse() d = json.loads(resp.read()) if resp.status != 200: log.warning("vision %s: %s", resp.status, str(d)[:80]) return "" content = d.get("choices", [{}])[0].get("message", {}).get("content", "").strip() return _dedup_rows(content) except Exception as e: log.warning("vision error: %s", e) return "" finally: conn.close() def _dedup_rows(md: str) -> str: seen, out = set(), [] for line in md.splitlines(): key = line.strip() if key.startswith("|") and key in seen: continue seen.add(key) out.append(line) return "\n".join(out) def _is_data_image(src: Path) -> bool: """True for wide landscape images likely to contain table/text data.""" from PIL import Image try: w, h = Image.open(src).size return w >= 300 and h >= 60 and w / h >= 1.5 except Exception: return False # --------------------------------------------------------------------------- # PaddleOCR fallback — lightweight HTTP call to paddleocr_server on AI server # --------------------------------------------------------------------------- def _paddle_available() -> bool: try: conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=2) conn.request("GET", "/") conn.getresponse().read() conn.close() return True except Exception: return False def _paddle_ocr(src: Path) -> str: import base64 data = base64.standard_b64encode(src.read_bytes()).decode() payload = json.dumps({"image_base64": data}).encode() conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=60) try: conn.request("POST", "/ocr", body=payload, headers={"Content-Type": "application/json", "Content-Length": str(len(payload))}) resp = conn.getresponse() d = json.loads(resp.read()) return d.get("markdown", "") except Exception as e: log.warning("paddle error: %s", e) return "" finally: conn.close() # --------------------------------------------------------------------------- # Tesseract OCR fallback via Docling docker container # --------------------------------------------------------------------------- def _tesseract_ocr(src: Path) -> str: from PIL import Image img = Image.open(src) scaled = src.with_suffix(".ocr.png") img.resize((img.width * 3, img.height * 3), Image.LANCZOS).save(scaled) container_path = f"/tmp/{scaled.name}" cp = subprocess.run(["docker", "cp", str(scaled), f"{DOCLING_CONTAINER}:{container_path}"], capture_output=True) if cp.returncode != 0: return "" script = f""" from docling.document_converter import DocumentConverter, ImageFormatOption, PdfFormatOption from docling.datamodel.pipeline_options import PdfPipelineOptions, TesseractCliOcrOptions from docling.datamodel.base_models import InputFormat opts = PdfPipelineOptions(do_ocr=True, ocr_options=TesseractCliOcrOptions()) conv = DocumentConverter(format_options={{ InputFormat.IMAGE: ImageFormatOption(pipeline_options=opts), InputFormat.PDF: PdfFormatOption(pipeline_options=opts), }}) print(conv.convert('{container_path}').document.export_to_markdown()) """ run = subprocess.run(["docker", "exec", DOCLING_CONTAINER, "python3", "-c", script], capture_output=True, text=True, timeout=120) lines = [l for l in run.stdout.splitlines() if not l.startswith("Loading weights")] return "\n".join(lines).strip() # --------------------------------------------------------------------------- # Docling convert (for non-image files) # --------------------------------------------------------------------------- def convert_docling(src: Path) -> str: if src.suffix.lower() not in DOCLING_SUPPORTED: return f"" try: if src.suffix.lower() in IMAGE_EXTS: # 1. Vision LLM for data images (tables/screenshots) if _is_data_image(src) and _vision_available(): result = _vision_ocr(src) if result: return result # 2. PaddleOCR fallback (better than Tesseract for dense text) if _paddle_available(): result = _paddle_ocr(src) if result: return result # 3. Tesseract last resort return _tesseract_ocr(src) return _post(DOCLING_HOST, DOCLING_PORT, "/convert", src, "use_llm=false") except Exception as e: return f"" # --------------------------------------------------------------------------- # Email helpers # --------------------------------------------------------------------------- def _email_id(name: str) -> str: return name.split(",")[0] def _html_body(msg) -> str | None: for part in msg.walk(): if part.get_content_type() == "text/html": p = part.get_payload(decode=True) if p: return p.decode(part.get_content_charset() or "utf-8", errors="replace") for part in msg.walk(): if part.get_content_type() == "text/plain": p = part.get_payload(decode=True) if p: return p.decode(part.get_content_charset() or "utf-8", errors="replace") return None def _write_md(path: Path, content: str): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") log.info(" wrote %s", path.relative_to(OUTPUT.parent)) def _write_jpg(dest_dir: Path, src: Path): dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / (src.stem + ".jpg") if src.suffix.lower() in {".jpg", ".jpeg"}: shutil.copy2(src, dest) else: from PIL import Image try: Image.open(src).convert("RGB").save(dest, "JPEG") except Exception: shutil.copy2(src, dest_dir / src.name) dest = dest_dir / src.name log.info(" saved %s", dest.relative_to(OUTPUT.parent)) # --------------------------------------------------------------------------- # Process one email # --------------------------------------------------------------------------- def process_email(filepath: Path, msg=None): if msg is None: with filepath.open("rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.compat32) eid = _email_id(filepath.name) log.info("Processing [%s] %s", eid[:30], msg.get("subject", "")[:60]) md_dir = OUTPUT / eid / "markitdown" doc_dir = OUTPUT / eid / "docling" with tempfile.TemporaryDirectory() as _tmp: tmp = Path(_tmp) # Build CID map and extract all parts to tmp cid_map: dict[str, str] = {} for part in msg.walk(): fn = part.get_filename() or part.get_param("name") if not fn: continue payload = part.get_payload(decode=True) if not payload: continue safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn) (tmp / safe).write_bytes(payload) ext = Path(safe).suffix.lower() out_name = Path(safe).stem + ".jpg" if ext in IMAGE_EXTS else safe cid = part.get("Content-ID", "").strip("<>") if cid: cid_map[cid] = out_name def fix_cid(html: str) -> str: return re.sub(r'cid:([^\s"\'>\)]+)', lambda m: cid_map.get(m.group(1), f"cid:{m.group(1)}"), html) def fix_img_comments(md: str, html: str) -> str: srcs = re.findall(r']+src=["\']?([^\s"\'>\)]+)', html, re.IGNORECASE) it = iter(srcs) return re.sub(r'', lambda _: f"![]({next(it)})" if (s := next(it, None)) else "", md) if srcs else md # Body body_html = _html_body(msg) if body_html: html_fixed = fix_cid(body_html) html_file = tmp / "body.html" html_file.write_text(html_fixed, encoding="utf-8") _write_md(md_dir / "body.md", convert_markitdown(html_file)) docling_body = convert_docling(html_file) srcs = re.findall(r']+src=["\']?([^\s"\'>\)]+)', html_fixed, re.IGNORECASE) it = iter(srcs) docling_body = re.sub(r'', lambda _: f"![]({next(it, 'image')})", docling_body) _write_md(doc_dir / "body.md", docling_body) else: log.warning(" no body") # Attachments for part in msg.walk(): fn = part.get_filename() or part.get_param("name") if not fn: continue payload = part.get_payload(decode=True) if not payload: continue safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn) att = tmp / safe att.write_bytes(payload) ext = Path(safe).suffix.lower() stem = Path(safe).stem if ext in IMAGE_EXTS: _write_jpg(md_dir, att) _write_jpg(doc_dir, att) ocr = convert_docling(att) if ocr and not ocr.startswith("