#!/usr/bin/env python3
"""Fetch Wallabag articles and create one EPUB per article."""
import argparse
import html
import json
import re
import time
import mimetypes
import urllib.parse
import urllib.request
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
from xml.sax.saxutils import escape
BASE_DIR = Path(".")
DEFAULT_CONFIG = Path("./wallabag.conf")
DEFAULT_OUT = Path("./out")
DEFAULT_DB = Path("./wallabag_downloaded.json")
def load_config(path: Path) -> dict:
cfg = {}
if path.is_file():
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
cfg[k.strip()] = v.strip().strip('"').strip("'")
return cfg
def http_json(url, method="GET", data=None, token=None):
body = None
headers = {"Accept": "application/json"}
if data is not None:
body = urllib.parse.urlencode(data).encode()
headers["Content-Type"] = "application/x-www-form-urlencoded"
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url, data=body, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=60) as r:
return json.loads(r.read().decode("utf-8"))
def wallabag_token(cfg):
url = cfg["WALLABAG_URL"].rstrip("/") + "/oauth/v2/token"
return http_json(url, "POST", {
"grant_type": "password",
"client_id": cfg["CLIENT_ID"],
"client_secret": cfg["CLIENT_SECRET"],
"username": cfg["USERNAME"],
"password": cfg["PASSWORD"],
})["access_token"]
def fetch_entries(cfg, token, limit=10, unread=True, starred=False, archive=False):
base = cfg["WALLABAG_URL"].rstrip("/") + "/api/entries.json"
qs = {
"perPage": str(limit),
"page": "1",
"sort": "created",
"order": "desc",
"detail": "full",
}
if unread:
qs["archive"] = "0"
if archive:
qs["archive"] = "1"
if starred:
qs["starred"] = "1"
data = http_json(base + "?" + urllib.parse.urlencode(qs), token=token)
return data.get("_embedded", {}).get("items", [])
def mark_archived(cfg, token, entry_id):
url = cfg["WALLABAG_URL"].rstrip("/") + f"/api/entries/{entry_id}.json"
return http_json(url, "PATCH", {"archive": "1"}, token=token)
def clean_fragment(content):
"""Prepare Wallabag HTML for EPUB while preserving formatting where possible.
Wallabag already extracts readable article HTML, so keep headings, lists,
blockquotes, tables, links, inline styles/classes, and images. Remove only
active/interactive content and make common void tags XML-compatible.
"""
if not content:
return "
"
content = re.sub(r"", "", content, flags=re.I | re.S)
content = re.sub(r"", "", content, flags=re.I | re.S)
content = re.sub(r"", "", content, flags=re.I | re.S)
content = re.sub(r"\s(on\w+)=([\"']).*?\2", "", content, flags=re.I | re.S)
# Many sites lazy-load images and Wallabag can keep the real URL in data-*
# while src is empty/a placeholder. Promote those before dropping extras.
content = promote_lazy_image_srcs(content)
content = re.sub(r"\s(srcset|sizes)=([\"']).*?\2", "", content, flags=re.I | re.S)
content = html.unescape(content)
content = re.sub(r"&(?!amp;|lt;|gt;|quot;|apos;|#\d+;|#x[0-9A-Fa-f]+;)", "&", content)
content = re.sub(r"<(br|hr|img|meta|link|input)(\b[^>]*?)(?", r"<\1\2 />", content, flags=re.I)
return content
def promote_lazy_image_srcs(content):
"""Use lazy-loader attributes as img src when src is missing/a placeholder."""
def repl(match):
tag = match.group(0)
attrs = dict((m.group(1).lower(), m.group(3)) for m in re.finditer(r"\s([\w:-]+)=([\"'])(.*?)\2", tag, flags=re.S))
src = (attrs.get("src") or "").strip()
lazy = None
for name in ("data-src", "data-original", "data-lazy-src", "data-url", "data-full-url"):
if attrs.get(name):
lazy = attrs[name].strip()
break
if not lazy:
return tag
if not src or src.startswith("data:") or "placeholder" in src.lower() or src in ("#", "/"):
if " src=" in tag.lower():
return re.sub(r"\ssrc=([\"']).*?\1", f' src="{lazy}"', tag, count=1, flags=re.I | re.S)
return tag[:-1] + f' src="{lazy}">'
return tag
return re.sub(r"
]*>", repl, content, flags=re.I | re.S)
def fetch_image(url, referer=None):
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124 Safari/537.36",
# Prefer Kindle-friendly formats. Some CDNs serve AVIF/WebP when asked,
# which Amazon's EPUB conversion may drop.
"Accept": "image/jpeg,image/png,image/gif,image/svg+xml,image/*;q=0.8,*/*;q=0.5",
}
if referer:
headers["Referer"] = referer
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as r:
data = r.read(20 * 1024 * 1024 + 1)
if len(data) > 20 * 1024 * 1024:
raise ValueError("image is larger than 20 MiB")
ctype = (r.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower()
return data, guess_image_type(url, data, ctype)
def guess_image_type(url, data, ctype):
if ctype.startswith("image/"):
media_type = ctype
elif data.startswith(b"\xff\xd8\xff"):
media_type = "image/jpeg"
elif data.startswith(b"\x89PNG\r\n\x1a\n"):
media_type = "image/png"
elif data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
media_type = "image/gif"
elif data.startswith(b"RIFF") and data[8:12] == b"WEBP":
media_type = "image/webp"
elif b""
def embed_images(content, base_url):
"""Download
sources, rewrite them to local EPUB paths, return manifest items."""
images = []
by_src = {}
def repl(match):
tag, quote, src = match.group(0), match.group(1), html.unescape(match.group(2)).strip()
if not src or src.startswith(("data:", "cid:")):
return tag
abs_url = urllib.parse.urljoin(base_url, src)
parsed = urllib.parse.urlparse(abs_url)
if parsed.scheme not in ("http", "https"):
return tag
if abs_url not in by_src:
try:
data, (media_type, ext) = fetch_image(abs_url, referer=base_url)
except Exception as e:
print(f"Warning: could not download image {abs_url}: {e}")
return tag
item_id = f"img{len(images) + 1}"
href = f"images/{item_id}{ext}"
by_src[abs_url] = href
images.append({"id": item_id, "href": href, "media_type": media_type, "data": data})
new_src = by_src[abs_url]
tag = re.sub(r"\ssrc=([\"']).*?\1", f' src="{new_src}"', tag, count=1, flags=re.I | re.S)
return sanitize_img_tag(tag)
content = re.sub(r"
]*\ssrc=([\"'])(.*?)\1[^>]*>", repl, content, flags=re.I | re.S)
return content, images
def safe_name(s):
s = re.sub(r"[^A-Za-z0-9_.-]+", "_", s).strip("_")
return s[:80] or "article"
def display_title(s):
"""Human-friendly Wallabag article title for EPUB metadata/display.
Filenames are sanitized separately with safe_name(); this function must not
use the filename. Some older/generated titles may contain underscores, so
turn those back into spaces for Kindle display.
"""
s = html.unescape(s or "Wallabag article")
s = s.replace("_", " ")
s = re.sub(r"\s+", " ", s).strip()
return s or "Wallabag article"
def metadata_title(s):
"""Title string intended for EPUB metadata, not filename."""
return display_title(s).replace(".epub", "").strip()
def build_epub(entry, out_path: Path, title: str | None = None):
"""Build a single-article EPUB."""
out_path.parent.mkdir(parents=True, exist_ok=True)
book_id = f"urn:uuid:{uuid.uuid4()}"
now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
etitle = metadata_title(title or entry.get("title") or "Wallabag article")
url = entry.get("url") or ""
domain = entry.get("domain_name") or urllib.parse.urlparse(url).netloc or "Wallabag"
published = entry.get("published_at") or entry.get("created_at") or ""
raw_content = entry.get("content") or ""
preview = entry.get("preview_picture") or ""
if preview and preview not in raw_content:
raw_content = f'
\n' + raw_content
content = clean_fragment(raw_content)
content, images = embed_images(content, url)
chapter = f'''
{escape(etitle)}
'''
container = '''
'''
image_manifest = "".join(
f' \n'
for img in images
)
opf = f'''
{book_id}
{escape(etitle)}
main
en
{escape(domain)}
aut
Wallabag
{escape(url)}
{now}
{image_manifest}
'''
nav = f'''
{escape(etitle)}
'''
css = """body{font-family:serif;line-height:1.45;margin:0;padding:1em;} article{max-width:42em;} h1{line-height:1.15;} img,video{max-width:100%;height:auto;} figure{margin:1em 0;} figcaption,.source{font-size:.85em;color:#666;} blockquote{border-left:3px solid #aaa;margin-left:.5em;padding-left:1em;color:#333;} pre,code{font-family:monospace;white-space:pre-wrap;} table{border-collapse:collapse;max-width:100%;} td,th{border:1px solid #ccc;padding:.25em;}"""
with zipfile.ZipFile(out_path, "w") as z:
z.writestr("mimetype", "application/epub+zip", compress_type=zipfile.ZIP_STORED)
z.writestr("META-INF/container.xml", container)
z.writestr("OEBPS/content.opf", opf)
z.writestr("OEBPS/nav.xhtml", nav)
z.writestr("OEBPS/style.css", css)
z.writestr("OEBPS/article.xhtml", chapter)
for img in images:
z.writestr("OEBPS/" + img["href"], img["data"])
print(f"Embedded {len(images)} image(s) in {out_path}")
return out_path
def article_key(entry) -> str:
if entry.get("id") is not None:
return f"id:{entry['id']}"
if entry.get("url"):
return "url:" + entry["url"]
return "title:" + display_title(entry.get("title") or "")
def load_downloaded(path: Path) -> dict:
if not path.is_file():
return {}
try:
data = json.loads(path.read_text())
return data if isinstance(data, dict) else {}
except Exception:
return {}
def save_downloaded(path: Path, data: dict):
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False, sort_keys=True) + "\n")
tmp.replace(path)
def remember_downloaded(db: dict, entry, out: Path):
key = article_key(entry)
db[key] = {
"id": entry.get("id"),
"title": display_title(entry.get("title") or ""),
"url": entry.get("url"),
"epub": str(out),
"downloaded_at": datetime.now().isoformat(timespec="seconds"),
}
def article_output_path(entry, out_dir: Path) -> Path:
title = entry.get("title") or f"wallabag-{entry.get('id', int(time.time()))}"
suffix = entry.get("id") or int(time.time())
return out_dir / f"{safe_name(title)}-{suffix}.epub"
def main():
ap = argparse.ArgumentParser(description="Fetch Wallabag articles and build one EPUB per article")
ap.add_argument("--config", default=str(DEFAULT_CONFIG))
ap.add_argument("--limit", type=int, default=10)
ap.add_argument("--all", action="store_true", help="include archived/read articles too")
ap.add_argument("--starred", action="store_true", help="only starred articles")
ap.add_argument("--title", default=None, help="title override only when exporting one article")
ap.add_argument("--output", default=None, help="output directory, or .epub file if --limit 1")
ap.add_argument("--archive", action="store_true", help="mark fetched articles archived after successful build")
ap.add_argument("--db", default=str(DEFAULT_DB), help=f"download evidence DB, default: {DEFAULT_DB}")
ap.add_argument("--redownload", action="store_true", help="ignore evidence DB and download articles again")
args = ap.parse_args()
cfg = load_config(Path(args.config).expanduser())
missing = [k for k in ["WALLABAG_URL", "CLIENT_ID", "CLIENT_SECRET", "USERNAME", "PASSWORD"] if not cfg.get(k)]
if missing:
raise SystemExit("Missing in wallabag.conf: " + ", ".join(missing))
token = wallabag_token(cfg)
entries = fetch_entries(cfg, token, limit=args.limit, unread=not args.all, starred=args.starred)
if not entries:
raise SystemExit("No articles found.")
db_path = Path(args.db).expanduser()
downloaded = load_downloaded(db_path)
original_count = len(entries)
if not args.redownload:
entries = [e for e in entries if article_key(e) not in downloaded]
skipped = original_count - len(entries)
if skipped:
print(f"Skipped {skipped} already downloaded article(s). Use --redownload to fetch again.")
if not entries:
raise SystemExit("No new articles to download.")
output_arg = Path(args.output).expanduser() if args.output else DEFAULT_OUT
out_dir = output_arg if output_arg.suffix.lower() != ".epub" or len(entries) > 1 else output_arg.parent
out_dir.mkdir(parents=True, exist_ok=True)
created = []
for i, entry in enumerate(entries, 1):
out = output_arg if len(entries) == 1 and output_arg.suffix.lower() == ".epub" else article_output_path(entry, out_dir)
title = args.title if len(entries) == 1 and args.title else (entry.get("title") or f"Wallabag article {i}")
build_epub(entry, out, title)
remember_downloaded(downloaded, entry, out)
save_downloaded(db_path, downloaded)
created.append((entry, out))
print(f"Created: {out}")
if args.archive:
for entry, _ in created:
if entry.get("id") is not None:
mark_archived(cfg, token, entry["id"])
print(f"Archived {len(created)} articles in Wallabag.")
if __name__ == "__main__":
main()