412 lines
15 KiB
Python
412 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Export emails from emailfiles/cur to Markdown.
|
|
|
|
Output: emailfiles/output/<email_id>/
|
|
markitdown/body.md markitdown/<stem>.md
|
|
docling/body.md docling/<stem>.md
|
|
|
|
Image OCR: vision LLM (llama-server via SSH tunnel) → Tesseract fallback
|
|
Tunnel: ssh -fNL 11436:127.0.0.1:11436 -J kai@192.168.171.2 root@172.25.57.233
|
|
"""
|
|
|
|
import email, email.policy, http.client, json, logging, os, re, shutil
|
|
import subprocess, sys, tempfile, uuid
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
|
|
# .env loader
|
|
_env = Path(__file__).parent / ".env"
|
|
if _env.exists():
|
|
for _l in _env.read_text().splitlines():
|
|
_l = _l.strip()
|
|
if _l and not _l.startswith("#") and "=" in _l:
|
|
_k, _v = _l.split("=", 1)
|
|
os.environ.setdefault(_k.strip(), _v.strip())
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
_WORKING = os.environ.get("WORKING_DIR", "emails")
|
|
MAILDIR = Path(__file__).parent / _WORKING / os.environ.get("WORKING_INPUT", "input")
|
|
OUTPUT = Path(__file__).parent / _WORKING / os.environ.get("WORKING_OUTPUT", "output")
|
|
|
|
MARKITDOWN_HOST = "localhost"
|
|
MARKITDOWN_PORT = 8282
|
|
DOCLING_HOST = "localhost"
|
|
DOCLING_PORT = 8383
|
|
DOCLING_CONTAINER = "ai-markdown-api-docling-1"
|
|
API_TIMEOUT = 300
|
|
|
|
VISION_HOST = os.environ.get("VISION_HOST", "localhost")
|
|
VISION_PORT = int(os.environ.get("VISION_PORT", "11436"))
|
|
|
|
PADDLE_HOST = os.environ.get("PADDLE_HOST", "localhost")
|
|
PADDLE_PORT = int(os.environ.get("PADDLE_PORT", "9999"))
|
|
|
|
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".tif", ".webp"}
|
|
DOCLING_SUPPORTED = {".pdf", ".docx", ".xlsx", ".pptx", ".html", ".htm"} | IMAGE_EXTS
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Multipart POST helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _post(host, port, path, filepath, params="") -> str:
|
|
boundary = uuid.uuid4().hex
|
|
data = filepath.read_bytes()
|
|
body = (
|
|
f"--{boundary}\r\n"
|
|
f'Content-Disposition: form-data; name="file"; filename="{filepath.name}"\r\n'
|
|
f"Content-Type: application/octet-stream\r\n\r\n"
|
|
).encode() + data + f"\r\n--{boundary}--\r\n".encode()
|
|
url = path + (f"?{params}" if params else "")
|
|
conn = http.client.HTTPConnection(host, port, timeout=API_TIMEOUT)
|
|
try:
|
|
conn.request("POST", url, body=body,
|
|
headers={"Content-Type": f"multipart/form-data; boundary={boundary}",
|
|
"Content-Length": str(len(body))})
|
|
resp = conn.getresponse()
|
|
raw = resp.read()
|
|
if resp.status != 200:
|
|
return f"<!-- API {resp.status} -->"
|
|
d = json.loads(raw)
|
|
return d.get("markdown") or d.get("content") or str(d)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def convert_markitdown(src: Path) -> str:
|
|
try:
|
|
return _post(MARKITDOWN_HOST, MARKITDOWN_PORT, "/convert", src, "use_llm=true")
|
|
except Exception as e:
|
|
return f"<!-- MarkItDown: {e} -->"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Vision LLM — llama-server via SSH tunnel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _vision_available() -> bool:
|
|
try:
|
|
conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=3)
|
|
conn.request("GET", "/health")
|
|
conn.getresponse().read()
|
|
conn.close()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _vision_ocr(src: Path) -> str:
|
|
"""Send image to llama-server, return markdown. Returns '' on failure."""
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
|
|
img = Image.open(src).convert("RGB")
|
|
if img.width < 1200:
|
|
scale = max(2, 1200 // img.width)
|
|
img = img.resize((img.width * scale, img.height * scale), Image.LANCZOS)
|
|
buf = io.BytesIO()
|
|
img.save(buf, "JPEG", quality=92)
|
|
b64 = base64.standard_b64encode(buf.getvalue()).decode()
|
|
|
|
payload = json.dumps({
|
|
"messages": [{"role": "user", "content": [
|
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
|
|
{"type": "text", "text": (
|
|
"Read every cell in this table carefully. "
|
|
"Output a Markdown table with ALL rows. "
|
|
"Do not skip or duplicate any row. "
|
|
"Return only the Markdown table."
|
|
)},
|
|
]}],
|
|
"max_tokens": 700,
|
|
"temperature": 0,
|
|
}).encode()
|
|
|
|
conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=120)
|
|
try:
|
|
conn.request("POST", "/v1/chat/completions", body=payload,
|
|
headers={"Content-Type": "application/json",
|
|
"Content-Length": str(len(payload))})
|
|
resp = conn.getresponse()
|
|
d = json.loads(resp.read())
|
|
if resp.status != 200:
|
|
log.warning("vision %s: %s", resp.status, str(d)[:80])
|
|
return ""
|
|
content = d.get("choices", [{}])[0].get("message", {}).get("content", "").strip()
|
|
return _dedup_rows(content)
|
|
except Exception as e:
|
|
log.warning("vision error: %s", e)
|
|
return ""
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _dedup_rows(md: str) -> str:
|
|
seen, out = set(), []
|
|
for line in md.splitlines():
|
|
key = line.strip()
|
|
if key.startswith("|") and key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(line)
|
|
return "\n".join(out)
|
|
|
|
|
|
def _is_data_image(src: Path) -> bool:
|
|
"""True for wide landscape images likely to contain table/text data."""
|
|
from PIL import Image
|
|
try:
|
|
w, h = Image.open(src).size
|
|
return w >= 300 and h >= 60 and w / h >= 1.5
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PaddleOCR fallback — lightweight HTTP call to paddleocr_server on AI server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _paddle_available() -> bool:
|
|
try:
|
|
conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=2)
|
|
conn.request("GET", "/")
|
|
conn.getresponse().read()
|
|
conn.close()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _paddle_ocr(src: Path) -> str:
|
|
import base64
|
|
data = base64.standard_b64encode(src.read_bytes()).decode()
|
|
payload = json.dumps({"image_base64": data}).encode()
|
|
conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=60)
|
|
try:
|
|
conn.request("POST", "/ocr", body=payload,
|
|
headers={"Content-Type": "application/json",
|
|
"Content-Length": str(len(payload))})
|
|
resp = conn.getresponse()
|
|
d = json.loads(resp.read())
|
|
return d.get("markdown", "")
|
|
except Exception as e:
|
|
log.warning("paddle error: %s", e)
|
|
return ""
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tesseract OCR fallback via Docling docker container
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _tesseract_ocr(src: Path) -> str:
|
|
from PIL import Image
|
|
img = Image.open(src)
|
|
scaled = src.with_suffix(".ocr.png")
|
|
img.resize((img.width * 3, img.height * 3), Image.LANCZOS).save(scaled)
|
|
|
|
container_path = f"/tmp/{scaled.name}"
|
|
cp = subprocess.run(["docker", "cp", str(scaled), f"{DOCLING_CONTAINER}:{container_path}"],
|
|
capture_output=True)
|
|
if cp.returncode != 0:
|
|
return ""
|
|
script = f"""
|
|
from docling.document_converter import DocumentConverter, ImageFormatOption, PdfFormatOption
|
|
from docling.datamodel.pipeline_options import PdfPipelineOptions, TesseractCliOcrOptions
|
|
from docling.datamodel.base_models import InputFormat
|
|
opts = PdfPipelineOptions(do_ocr=True, ocr_options=TesseractCliOcrOptions())
|
|
conv = DocumentConverter(format_options={{
|
|
InputFormat.IMAGE: ImageFormatOption(pipeline_options=opts),
|
|
InputFormat.PDF: PdfFormatOption(pipeline_options=opts),
|
|
}})
|
|
print(conv.convert('{container_path}').document.export_to_markdown())
|
|
"""
|
|
run = subprocess.run(["docker", "exec", DOCLING_CONTAINER, "python3", "-c", script],
|
|
capture_output=True, text=True, timeout=120)
|
|
lines = [l for l in run.stdout.splitlines() if not l.startswith("Loading weights")]
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Docling convert (for non-image files)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def convert_docling(src: Path) -> str:
|
|
if src.suffix.lower() not in DOCLING_SUPPORTED:
|
|
return f"<!-- unsupported: {src.suffix} -->"
|
|
try:
|
|
if src.suffix.lower() in IMAGE_EXTS:
|
|
# 1. Vision LLM for data images (tables/screenshots)
|
|
if _is_data_image(src) and _vision_available():
|
|
result = _vision_ocr(src)
|
|
if result:
|
|
return result
|
|
# 2. PaddleOCR fallback (better than Tesseract for dense text)
|
|
if _paddle_available():
|
|
result = _paddle_ocr(src)
|
|
if result:
|
|
return result
|
|
# 3. Tesseract last resort
|
|
return _tesseract_ocr(src)
|
|
return _post(DOCLING_HOST, DOCLING_PORT, "/convert", src, "use_llm=false")
|
|
except Exception as e:
|
|
return f"<!-- Docling: {e} -->"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Email helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _email_id(name: str) -> str:
|
|
return name.split(",")[0]
|
|
|
|
|
|
def _html_body(msg) -> str | None:
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
p = part.get_payload(decode=True)
|
|
if p:
|
|
return p.decode(part.get_content_charset() or "utf-8", errors="replace")
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
p = part.get_payload(decode=True)
|
|
if p:
|
|
return p.decode(part.get_content_charset() or "utf-8", errors="replace")
|
|
return None
|
|
|
|
|
|
def _write_md(path: Path, content: str):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(content, encoding="utf-8")
|
|
log.info(" wrote %s", path.relative_to(OUTPUT.parent))
|
|
|
|
|
|
def _write_jpg(dest_dir: Path, src: Path):
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = dest_dir / (src.stem + ".jpg")
|
|
if src.suffix.lower() in {".jpg", ".jpeg"}:
|
|
shutil.copy2(src, dest)
|
|
else:
|
|
from PIL import Image
|
|
try:
|
|
Image.open(src).convert("RGB").save(dest, "JPEG")
|
|
except Exception:
|
|
shutil.copy2(src, dest_dir / src.name)
|
|
dest = dest_dir / src.name
|
|
log.info(" saved %s", dest.relative_to(OUTPUT.parent))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Process one email
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def process_email(filepath: Path, msg=None):
|
|
if msg is None:
|
|
with filepath.open("rb") as f:
|
|
msg = email.message_from_binary_file(f, policy=email.policy.compat32)
|
|
|
|
eid = _email_id(filepath.name)
|
|
log.info("Processing [%s] %s", eid[:30], msg.get("subject", "")[:60])
|
|
|
|
md_dir = OUTPUT / eid / "markitdown"
|
|
doc_dir = OUTPUT / eid / "docling"
|
|
|
|
with tempfile.TemporaryDirectory() as _tmp:
|
|
tmp = Path(_tmp)
|
|
|
|
# Build CID map and extract all parts to tmp
|
|
cid_map: dict[str, str] = {}
|
|
for part in msg.walk():
|
|
fn = part.get_filename() or part.get_param("name")
|
|
if not fn:
|
|
continue
|
|
payload = part.get_payload(decode=True)
|
|
if not payload:
|
|
continue
|
|
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn)
|
|
(tmp / safe).write_bytes(payload)
|
|
ext = Path(safe).suffix.lower()
|
|
out_name = Path(safe).stem + ".jpg" if ext in IMAGE_EXTS else safe
|
|
cid = part.get("Content-ID", "").strip("<>")
|
|
if cid:
|
|
cid_map[cid] = out_name
|
|
|
|
def fix_cid(html: str) -> str:
|
|
return re.sub(r'cid:([^\s"\'>\)]+)',
|
|
lambda m: cid_map.get(m.group(1), f"cid:{m.group(1)}"), html)
|
|
|
|
def fix_img_comments(md: str, html: str) -> str:
|
|
srcs = re.findall(r'<img[^>]+src=["\']?([^\s"\'>\)]+)', html, re.IGNORECASE)
|
|
it = iter(srcs)
|
|
return re.sub(r'<!-- image -->', lambda _: f"})" if (s := next(it, None)) else "<!-- image -->",
|
|
md) if srcs else md
|
|
|
|
# Body
|
|
body_html = _html_body(msg)
|
|
if body_html:
|
|
html_fixed = fix_cid(body_html)
|
|
html_file = tmp / "body.html"
|
|
html_file.write_text(html_fixed, encoding="utf-8")
|
|
_write_md(md_dir / "body.md", convert_markitdown(html_file))
|
|
docling_body = convert_docling(html_file)
|
|
srcs = re.findall(r'<img[^>]+src=["\']?([^\s"\'>\)]+)', html_fixed, re.IGNORECASE)
|
|
it = iter(srcs)
|
|
docling_body = re.sub(r'<!-- image -->',
|
|
lambda _: f"})", docling_body)
|
|
_write_md(doc_dir / "body.md", docling_body)
|
|
else:
|
|
log.warning(" no body")
|
|
|
|
# Attachments
|
|
for part in msg.walk():
|
|
fn = part.get_filename() or part.get_param("name")
|
|
if not fn:
|
|
continue
|
|
payload = part.get_payload(decode=True)
|
|
if not payload:
|
|
continue
|
|
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn)
|
|
att = tmp / safe
|
|
att.write_bytes(payload)
|
|
ext = Path(safe).suffix.lower()
|
|
stem = Path(safe).stem
|
|
if ext in IMAGE_EXTS:
|
|
_write_jpg(md_dir, att)
|
|
_write_jpg(doc_dir, att)
|
|
ocr = convert_docling(att)
|
|
if ocr and not ocr.startswith("<!--"):
|
|
_write_md(doc_dir / f"{stem}.md", ocr)
|
|
else:
|
|
_write_md(md_dir / f"{stem}.md", convert_markitdown(att))
|
|
_write_md(doc_dir / f"{stem}.md", convert_docling(att))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
if not MAILDIR.exists():
|
|
sys.exit(f"Maildir not found: {MAILDIR}")
|
|
files = [f for f in sorted(MAILDIR.iterdir()) if f.is_file()]
|
|
log.info("Found %d emails — vision: %s:%s", len(files), VISION_HOST, VISION_PORT)
|
|
with tqdm(files, unit="email", desc="Exporting", ncols=80) as bar:
|
|
for fp in bar:
|
|
try:
|
|
with fp.open("rb") as f:
|
|
msg = email.message_from_binary_file(f, policy=email.policy.compat32)
|
|
bar.set_postfix_str(msg.get("subject", "")[:40], refresh=False)
|
|
process_email(fp, msg)
|
|
except Exception as e:
|
|
log.error("Failed %s: %s", fp.name[:40], e)
|
|
log.info("Done → %s", OUTPUT)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|