AI-markdown/export_mails.py

412 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Export emails from emailfiles/cur to Markdown.
Output: emailfiles/output/<email_id>/
markitdown/body.md markitdown/<stem>.md
docling/body.md docling/<stem>.md
Image OCR: vision LLM (llama-server via SSH tunnel) → Tesseract fallback
Tunnel: ssh -fNL 11436:127.0.0.1:11436 -J kai@192.168.171.2 root@172.25.57.233
"""
import email, email.policy, http.client, json, logging, os, re, shutil
import subprocess, sys, tempfile, uuid
from pathlib import Path
from tqdm import tqdm
# .env loader
_env = Path(__file__).parent / ".env"
if _env.exists():
for _l in _env.read_text().splitlines():
_l = _l.strip()
if _l and not _l.startswith("#") and "=" in _l:
_k, _v = _l.split("=", 1)
os.environ.setdefault(_k.strip(), _v.strip())
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
_WORKING = os.environ.get("WORKING_DIR", "emails")
MAILDIR = Path(__file__).parent / _WORKING / os.environ.get("WORKING_INPUT", "input")
OUTPUT = Path(__file__).parent / _WORKING / os.environ.get("WORKING_OUTPUT", "output")
MARKITDOWN_HOST = "localhost"
MARKITDOWN_PORT = 8282
DOCLING_HOST = "localhost"
DOCLING_PORT = 8383
DOCLING_CONTAINER = "ai-markdown-api-docling-1"
API_TIMEOUT = 300
VISION_HOST = os.environ.get("VISION_HOST", "localhost")
VISION_PORT = int(os.environ.get("VISION_PORT", "11436"))
PADDLE_HOST = os.environ.get("PADDLE_HOST", "localhost")
PADDLE_PORT = int(os.environ.get("PADDLE_PORT", "9999"))
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".tif", ".webp"}
DOCLING_SUPPORTED = {".pdf", ".docx", ".xlsx", ".pptx", ".html", ".htm"} | IMAGE_EXTS
# ---------------------------------------------------------------------------
# Multipart POST helper
# ---------------------------------------------------------------------------
def _post(host, port, path, filepath, params="") -> str:
boundary = uuid.uuid4().hex
data = filepath.read_bytes()
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filepath.name}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + data + f"\r\n--{boundary}--\r\n".encode()
url = path + (f"?{params}" if params else "")
conn = http.client.HTTPConnection(host, port, timeout=API_TIMEOUT)
try:
conn.request("POST", url, body=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body))})
resp = conn.getresponse()
raw = resp.read()
if resp.status != 200:
return f"<!-- API {resp.status} -->"
d = json.loads(raw)
return d.get("markdown") or d.get("content") or str(d)
finally:
conn.close()
def convert_markitdown(src: Path) -> str:
try:
return _post(MARKITDOWN_HOST, MARKITDOWN_PORT, "/convert", src, "use_llm=true")
except Exception as e:
return f"<!-- MarkItDown: {e} -->"
# ---------------------------------------------------------------------------
# Vision LLM — llama-server via SSH tunnel
# ---------------------------------------------------------------------------
def _vision_available() -> bool:
try:
conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=3)
conn.request("GET", "/health")
conn.getresponse().read()
conn.close()
return True
except Exception:
return False
def _vision_ocr(src: Path) -> str:
"""Send image to llama-server, return markdown. Returns '' on failure."""
import base64
from PIL import Image
import io
img = Image.open(src).convert("RGB")
if img.width < 1200:
scale = max(2, 1200 // img.width)
img = img.resize((img.width * scale, img.height * scale), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, "JPEG", quality=92)
b64 = base64.standard_b64encode(buf.getvalue()).decode()
payload = json.dumps({
"messages": [{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
{"type": "text", "text": (
"Read every cell in this table carefully. "
"Output a Markdown table with ALL rows. "
"Do not skip or duplicate any row. "
"Return only the Markdown table."
)},
]}],
"max_tokens": 700,
"temperature": 0,
}).encode()
conn = http.client.HTTPConnection(VISION_HOST, VISION_PORT, timeout=120)
try:
conn.request("POST", "/v1/chat/completions", body=payload,
headers={"Content-Type": "application/json",
"Content-Length": str(len(payload))})
resp = conn.getresponse()
d = json.loads(resp.read())
if resp.status != 200:
log.warning("vision %s: %s", resp.status, str(d)[:80])
return ""
content = d.get("choices", [{}])[0].get("message", {}).get("content", "").strip()
return _dedup_rows(content)
except Exception as e:
log.warning("vision error: %s", e)
return ""
finally:
conn.close()
def _dedup_rows(md: str) -> str:
seen, out = set(), []
for line in md.splitlines():
key = line.strip()
if key.startswith("|") and key in seen:
continue
seen.add(key)
out.append(line)
return "\n".join(out)
def _is_data_image(src: Path) -> bool:
"""True for wide landscape images likely to contain table/text data."""
from PIL import Image
try:
w, h = Image.open(src).size
return w >= 300 and h >= 60 and w / h >= 1.5
except Exception:
return False
# ---------------------------------------------------------------------------
# PaddleOCR fallback — lightweight HTTP call to paddleocr_server on AI server
# ---------------------------------------------------------------------------
def _paddle_available() -> bool:
try:
conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=2)
conn.request("GET", "/")
conn.getresponse().read()
conn.close()
return True
except Exception:
return False
def _paddle_ocr(src: Path) -> str:
import base64
data = base64.standard_b64encode(src.read_bytes()).decode()
payload = json.dumps({"image_base64": data}).encode()
conn = http.client.HTTPConnection(PADDLE_HOST, PADDLE_PORT, timeout=60)
try:
conn.request("POST", "/ocr", body=payload,
headers={"Content-Type": "application/json",
"Content-Length": str(len(payload))})
resp = conn.getresponse()
d = json.loads(resp.read())
return d.get("markdown", "")
except Exception as e:
log.warning("paddle error: %s", e)
return ""
finally:
conn.close()
# ---------------------------------------------------------------------------
# Tesseract OCR fallback via Docling docker container
# ---------------------------------------------------------------------------
def _tesseract_ocr(src: Path) -> str:
from PIL import Image
img = Image.open(src)
scaled = src.with_suffix(".ocr.png")
img.resize((img.width * 3, img.height * 3), Image.LANCZOS).save(scaled)
container_path = f"/tmp/{scaled.name}"
cp = subprocess.run(["docker", "cp", str(scaled), f"{DOCLING_CONTAINER}:{container_path}"],
capture_output=True)
if cp.returncode != 0:
return ""
script = f"""
from docling.document_converter import DocumentConverter, ImageFormatOption, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TesseractCliOcrOptions
from docling.datamodel.base_models import InputFormat
opts = PdfPipelineOptions(do_ocr=True, ocr_options=TesseractCliOcrOptions())
conv = DocumentConverter(format_options={{
InputFormat.IMAGE: ImageFormatOption(pipeline_options=opts),
InputFormat.PDF: PdfFormatOption(pipeline_options=opts),
}})
print(conv.convert('{container_path}').document.export_to_markdown())
"""
run = subprocess.run(["docker", "exec", DOCLING_CONTAINER, "python3", "-c", script],
capture_output=True, text=True, timeout=120)
lines = [l for l in run.stdout.splitlines() if not l.startswith("Loading weights")]
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Docling convert (for non-image files)
# ---------------------------------------------------------------------------
def convert_docling(src: Path) -> str:
if src.suffix.lower() not in DOCLING_SUPPORTED:
return f"<!-- unsupported: {src.suffix} -->"
try:
if src.suffix.lower() in IMAGE_EXTS:
# 1. Vision LLM for data images (tables/screenshots)
if _is_data_image(src) and _vision_available():
result = _vision_ocr(src)
if result:
return result
# 2. PaddleOCR fallback (better than Tesseract for dense text)
if _paddle_available():
result = _paddle_ocr(src)
if result:
return result
# 3. Tesseract last resort
return _tesseract_ocr(src)
return _post(DOCLING_HOST, DOCLING_PORT, "/convert", src, "use_llm=false")
except Exception as e:
return f"<!-- Docling: {e} -->"
# ---------------------------------------------------------------------------
# Email helpers
# ---------------------------------------------------------------------------
def _email_id(name: str) -> str:
return name.split(",")[0]
def _html_body(msg) -> str | None:
for part in msg.walk():
if part.get_content_type() == "text/html":
p = part.get_payload(decode=True)
if p:
return p.decode(part.get_content_charset() or "utf-8", errors="replace")
for part in msg.walk():
if part.get_content_type() == "text/plain":
p = part.get_payload(decode=True)
if p:
return p.decode(part.get_content_charset() or "utf-8", errors="replace")
return None
def _write_md(path: Path, content: str):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
log.info(" wrote %s", path.relative_to(OUTPUT.parent))
def _write_jpg(dest_dir: Path, src: Path):
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / (src.stem + ".jpg")
if src.suffix.lower() in {".jpg", ".jpeg"}:
shutil.copy2(src, dest)
else:
from PIL import Image
try:
Image.open(src).convert("RGB").save(dest, "JPEG")
except Exception:
shutil.copy2(src, dest_dir / src.name)
dest = dest_dir / src.name
log.info(" saved %s", dest.relative_to(OUTPUT.parent))
# ---------------------------------------------------------------------------
# Process one email
# ---------------------------------------------------------------------------
def process_email(filepath: Path, msg=None):
if msg is None:
with filepath.open("rb") as f:
msg = email.message_from_binary_file(f, policy=email.policy.compat32)
eid = _email_id(filepath.name)
log.info("Processing [%s] %s", eid[:30], msg.get("subject", "")[:60])
md_dir = OUTPUT / eid / "markitdown"
doc_dir = OUTPUT / eid / "docling"
with tempfile.TemporaryDirectory() as _tmp:
tmp = Path(_tmp)
# Build CID map and extract all parts to tmp
cid_map: dict[str, str] = {}
for part in msg.walk():
fn = part.get_filename() or part.get_param("name")
if not fn:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn)
(tmp / safe).write_bytes(payload)
ext = Path(safe).suffix.lower()
out_name = Path(safe).stem + ".jpg" if ext in IMAGE_EXTS else safe
cid = part.get("Content-ID", "").strip("<>")
if cid:
cid_map[cid] = out_name
def fix_cid(html: str) -> str:
return re.sub(r'cid:([^\s"\'>\)]+)',
lambda m: cid_map.get(m.group(1), f"cid:{m.group(1)}"), html)
def fix_img_comments(md: str, html: str) -> str:
srcs = re.findall(r'<img[^>]+src=["\']?([^\s"\'>\)]+)', html, re.IGNORECASE)
it = iter(srcs)
return re.sub(r'<!-- image -->', lambda _: f"![]({next(it)})" if (s := next(it, None)) else "<!-- image -->",
md) if srcs else md
# Body
body_html = _html_body(msg)
if body_html:
html_fixed = fix_cid(body_html)
html_file = tmp / "body.html"
html_file.write_text(html_fixed, encoding="utf-8")
_write_md(md_dir / "body.md", convert_markitdown(html_file))
docling_body = convert_docling(html_file)
srcs = re.findall(r'<img[^>]+src=["\']?([^\s"\'>\)]+)', html_fixed, re.IGNORECASE)
it = iter(srcs)
docling_body = re.sub(r'<!-- image -->',
lambda _: f"![]({next(it, 'image')})", docling_body)
_write_md(doc_dir / "body.md", docling_body)
else:
log.warning(" no body")
# Attachments
for part in msg.walk():
fn = part.get_filename() or part.get_param("name")
if not fn:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in fn)
att = tmp / safe
att.write_bytes(payload)
ext = Path(safe).suffix.lower()
stem = Path(safe).stem
if ext in IMAGE_EXTS:
_write_jpg(md_dir, att)
_write_jpg(doc_dir, att)
ocr = convert_docling(att)
if ocr and not ocr.startswith("<!--"):
_write_md(doc_dir / f"{stem}.md", ocr)
else:
_write_md(md_dir / f"{stem}.md", convert_markitdown(att))
_write_md(doc_dir / f"{stem}.md", convert_docling(att))
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if not MAILDIR.exists():
sys.exit(f"Maildir not found: {MAILDIR}")
files = [f for f in sorted(MAILDIR.iterdir()) if f.is_file()]
log.info("Found %d emails — vision: %s:%s", len(files), VISION_HOST, VISION_PORT)
with tqdm(files, unit="email", desc="Exporting", ncols=80) as bar:
for fp in bar:
try:
with fp.open("rb") as f:
msg = email.message_from_binary_file(f, policy=email.policy.compat32)
bar.set_postfix_str(msg.get("subject", "")[:40], refresh=False)
process_email(fp, msg)
except Exception as e:
log.error("Failed %s: %s", fp.name[:40], e)
log.info("Done → %s", OUTPUT)
if __name__ == "__main__":
main()