Initial commit
This commit is contained in:
commit
01af871145
5 changed files with 615 additions and 0 deletions
15
.gitignore
vendored
Normal file
15
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Local secrets/configuration
|
||||||
|
*.conf
|
||||||
|
.git.env
|
||||||
|
|
||||||
|
# Generated/state files
|
||||||
|
wallabag_downloaded.json
|
||||||
|
out/
|
||||||
|
|
||||||
|
# Python cache
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
|
||||||
|
# OS/editor noise
|
||||||
|
.DS_Store
|
||||||
|
*.swp
|
||||||
6
mail.conf.sample
Normal file
6
mail.conf.sample
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
SMTP_HOST=smtp.example.com
|
||||||
|
SMTP_PORT=587
|
||||||
|
SMTP_USER=user@example.com
|
||||||
|
SMTP_PASS=change-me
|
||||||
|
SMTP_SENDER=user@example.com
|
||||||
|
KINDLE_EMAIL=your-kindle@example.com
|
||||||
172
send_to_kindle.py
Executable file
172
send_to_kindle.py
Executable file
|
|
@ -0,0 +1,172 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
import mimetypes
|
||||||
|
import os
|
||||||
|
import smtplib
|
||||||
|
import re
|
||||||
|
import zipfile
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
DEFAULT_CONFIG = Path("./mail.conf")
|
||||||
|
DEFAULT_OUT = Path("./out")
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(path: Path) -> dict:
|
||||||
|
"""Load simple KEY=VALUE config file. Lines starting with # are ignored."""
|
||||||
|
cfg = {}
|
||||||
|
if not path.is_file():
|
||||||
|
return cfg
|
||||||
|
for line in path.read_text().splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith("#") or "=" not in line:
|
||||||
|
continue
|
||||||
|
key, value = line.split("=", 1)
|
||||||
|
cfg[key.strip()] = value.strip().strip('"').strip("'")
|
||||||
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
def get_value(args, cfg, attr, key, env=None, default=None):
|
||||||
|
val = getattr(args, attr, None)
|
||||||
|
if val not in (None, ""):
|
||||||
|
return val
|
||||||
|
if key in cfg and cfg[key] != "":
|
||||||
|
return cfg[key]
|
||||||
|
if env and os.getenv(env):
|
||||||
|
return os.getenv(env)
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def chunks(items, size):
|
||||||
|
for i in range(0, len(items), size):
|
||||||
|
yield items[i:i + size]
|
||||||
|
|
||||||
|
|
||||||
|
def epub_title(path: Path) -> str | None:
|
||||||
|
"""Read dc:title from EPUB metadata, if available."""
|
||||||
|
try:
|
||||||
|
with zipfile.ZipFile(path) as z:
|
||||||
|
container = ET.fromstring(z.read("META-INF/container.xml"))
|
||||||
|
ns_container = {"c": "urn:oasis:names:tc:opendocument:xmlns:container"}
|
||||||
|
rootfile = container.find(".//c:rootfile", ns_container)
|
||||||
|
if rootfile is None:
|
||||||
|
return None
|
||||||
|
opf_path = rootfile.attrib["full-path"]
|
||||||
|
opf = ET.fromstring(z.read(opf_path))
|
||||||
|
ns = {"dc": "http://purl.org/dc/elements/1.1/"}
|
||||||
|
title = opf.find(".//dc:title", ns)
|
||||||
|
if title is not None and title.text:
|
||||||
|
return " ".join(title.text.replace("_", " ").split())
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def attachment_name(path: Path) -> str:
|
||||||
|
"""Use EPUB metadata title as emailed attachment filename.
|
||||||
|
|
||||||
|
Local filenames stay unchanged. Amazon Send-to-Kindle often derives the
|
||||||
|
displayed document title from the email attachment filename, so use a nice
|
||||||
|
title-based attachment filename while keeping the .epub extension.
|
||||||
|
"""
|
||||||
|
title = epub_title(path)
|
||||||
|
if not title:
|
||||||
|
return path.name
|
||||||
|
name = re.sub(r'[\\/:*?"<>|]+', ' ', title)
|
||||||
|
name = re.sub(r"\s+", " ", name).strip().rstrip('.')
|
||||||
|
return (name[:120] or path.stem) + path.suffix.lower()
|
||||||
|
|
||||||
|
|
||||||
|
def send_to_kindle(smtp_host, smtp_port, smtp_user, smtp_pass, sender, kindle_email, file_paths):
|
||||||
|
file_paths = [Path(p) for p in file_paths]
|
||||||
|
for file_path in file_paths:
|
||||||
|
if not file_path.is_file():
|
||||||
|
raise FileNotFoundError(file_path)
|
||||||
|
|
||||||
|
msg = EmailMessage()
|
||||||
|
msg["From"] = sender
|
||||||
|
msg["To"] = kindle_email
|
||||||
|
msg["Subject"] = "Send to Kindle"
|
||||||
|
display_names = [attachment_name(p) for p in file_paths]
|
||||||
|
msg.set_content("Attached ebook(s):\n\n" + "\n".join(display_names))
|
||||||
|
|
||||||
|
for file_path in file_paths:
|
||||||
|
ctype, _ = mimetypes.guess_type(file_path)
|
||||||
|
if ctype is None:
|
||||||
|
ctype = "application/octet-stream"
|
||||||
|
maintype, subtype = ctype.split("/", 1)
|
||||||
|
|
||||||
|
with file_path.open("rb") as f:
|
||||||
|
msg.add_attachment(
|
||||||
|
f.read(),
|
||||||
|
maintype=maintype,
|
||||||
|
subtype=subtype,
|
||||||
|
filename=attachment_name(file_path),
|
||||||
|
)
|
||||||
|
|
||||||
|
with smtplib.SMTP_SSL(smtp_host, int(smtp_port)) as smtp:
|
||||||
|
smtp.login(smtp_user, smtp_pass)
|
||||||
|
smtp.send_message(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def find_epubs(out_dir: Path):
|
||||||
|
return sorted(out_dir.glob("*.epub"))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
p = argparse.ArgumentParser(description="Send ebook(s) to Kindle via email")
|
||||||
|
p.add_argument("file", nargs="?", help="ebook file, e.g. .epub/.pdf/.mobi. If omitted, sends all .epub files in ./out")
|
||||||
|
p.add_argument("--config", default=str(DEFAULT_CONFIG), help=f"config file, default: {DEFAULT_CONFIG}")
|
||||||
|
p.add_argument("--kindle", help="your Kindle email, e.g. name@kindle.com")
|
||||||
|
p.add_argument("--smtp-host")
|
||||||
|
p.add_argument("--smtp-port", type=int)
|
||||||
|
p.add_argument("--smtp-user")
|
||||||
|
p.add_argument("--smtp-pass")
|
||||||
|
p.add_argument("--sender")
|
||||||
|
p.add_argument("--max-attachments", type=int, default=16, help="maximum attachments per email, default: 16")
|
||||||
|
args = p.parse_args()
|
||||||
|
|
||||||
|
cfg = load_config(Path(args.config).expanduser())
|
||||||
|
|
||||||
|
settings = {
|
||||||
|
"smtp_host": get_value(args, cfg, "smtp_host", "SMTP_HOST", "SMTP_HOST"),
|
||||||
|
"smtp_port": get_value(args, cfg, "smtp_port", "SMTP_PORT", "SMTP_PORT", "465"),
|
||||||
|
"smtp_user": get_value(args, cfg, "smtp_user", "SMTP_USER", "SMTP_USER"),
|
||||||
|
"smtp_pass": get_value(args, cfg, "smtp_pass", "SMTP_PASS", "SMTP_PASS"),
|
||||||
|
"sender": get_value(args, cfg, "sender", "SMTP_SENDER", "SMTP_SENDER"),
|
||||||
|
"kindle": get_value(args, cfg, "kindle", "KINDLE_EMAIL", "KINDLE_EMAIL"),
|
||||||
|
}
|
||||||
|
|
||||||
|
missing = [k for k, v in settings.items() if not v]
|
||||||
|
if missing:
|
||||||
|
raise SystemExit(
|
||||||
|
"Missing: " + ", ".join(missing) +
|
||||||
|
f"\nAdd them to {args.config} or pass them as command-line options."
|
||||||
|
)
|
||||||
|
|
||||||
|
files = [Path(args.file).expanduser()] if args.file else find_epubs(DEFAULT_OUT)
|
||||||
|
if not files:
|
||||||
|
raise SystemExit(f"No EPUB files found in {DEFAULT_OUT}")
|
||||||
|
|
||||||
|
max_attachments = max(1, args.max_attachments)
|
||||||
|
batches = list(chunks(files, max_attachments))
|
||||||
|
for idx, batch in enumerate(batches, 1):
|
||||||
|
send_to_kindle(
|
||||||
|
settings["smtp_host"],
|
||||||
|
settings["smtp_port"],
|
||||||
|
settings["smtp_user"],
|
||||||
|
settings["smtp_pass"],
|
||||||
|
settings["sender"],
|
||||||
|
settings["kindle"],
|
||||||
|
batch,
|
||||||
|
)
|
||||||
|
suffix = f" ({idx}/{len(batches)})" if len(batches) > 1 else ""
|
||||||
|
print(f"Sent email{suffix}: {len(batch)} attachment(s)")
|
||||||
|
for file_path in batch:
|
||||||
|
file_path.unlink()
|
||||||
|
print(f" - sent and deleted: {file_path}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
wallabag.conf.sample
Normal file
5
wallabag.conf.sample
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
WALLABAG_URL=https://wallabag.example.com
|
||||||
|
CLIENT_ID=change-me
|
||||||
|
CLIENT_SECRET=change-me
|
||||||
|
USERNAME=change-me
|
||||||
|
PASSWORD=change-me
|
||||||
417
wallabag_to_epub.py
Executable file
417
wallabag_to_epub.py
Executable file
|
|
@ -0,0 +1,417 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Fetch Wallabag articles and create one EPUB per article."""
|
||||||
|
import argparse
|
||||||
|
import html
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import mimetypes
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
import uuid
|
||||||
|
import zipfile
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from xml.sax.saxutils import escape
|
||||||
|
|
||||||
|
BASE_DIR = Path(".")
|
||||||
|
DEFAULT_CONFIG = Path("./wallabag.conf")
|
||||||
|
DEFAULT_OUT = Path("./out")
|
||||||
|
DEFAULT_DB = Path("./wallabag_downloaded.json")
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(path: Path) -> dict:
|
||||||
|
cfg = {}
|
||||||
|
if path.is_file():
|
||||||
|
for line in path.read_text().splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith("#") or "=" not in line:
|
||||||
|
continue
|
||||||
|
k, v = line.split("=", 1)
|
||||||
|
cfg[k.strip()] = v.strip().strip('"').strip("'")
|
||||||
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
def http_json(url, method="GET", data=None, token=None):
|
||||||
|
body = None
|
||||||
|
headers = {"Accept": "application/json"}
|
||||||
|
if data is not None:
|
||||||
|
body = urllib.parse.urlencode(data).encode()
|
||||||
|
headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||||
|
if token:
|
||||||
|
headers["Authorization"] = f"Bearer {token}"
|
||||||
|
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||||
|
with urllib.request.urlopen(req, timeout=60) as r:
|
||||||
|
return json.loads(r.read().decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def wallabag_token(cfg):
|
||||||
|
url = cfg["WALLABAG_URL"].rstrip("/") + "/oauth/v2/token"
|
||||||
|
return http_json(url, "POST", {
|
||||||
|
"grant_type": "password",
|
||||||
|
"client_id": cfg["CLIENT_ID"],
|
||||||
|
"client_secret": cfg["CLIENT_SECRET"],
|
||||||
|
"username": cfg["USERNAME"],
|
||||||
|
"password": cfg["PASSWORD"],
|
||||||
|
})["access_token"]
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_entries(cfg, token, limit=10, unread=True, starred=False, archive=False):
|
||||||
|
base = cfg["WALLABAG_URL"].rstrip("/") + "/api/entries.json"
|
||||||
|
qs = {
|
||||||
|
"perPage": str(limit),
|
||||||
|
"page": "1",
|
||||||
|
"sort": "created",
|
||||||
|
"order": "desc",
|
||||||
|
"detail": "full",
|
||||||
|
}
|
||||||
|
if unread:
|
||||||
|
qs["archive"] = "0"
|
||||||
|
if archive:
|
||||||
|
qs["archive"] = "1"
|
||||||
|
if starred:
|
||||||
|
qs["starred"] = "1"
|
||||||
|
data = http_json(base + "?" + urllib.parse.urlencode(qs), token=token)
|
||||||
|
return data.get("_embedded", {}).get("items", [])
|
||||||
|
|
||||||
|
|
||||||
|
def mark_archived(cfg, token, entry_id):
|
||||||
|
url = cfg["WALLABAG_URL"].rstrip("/") + f"/api/entries/{entry_id}.json"
|
||||||
|
return http_json(url, "PATCH", {"archive": "1"}, token=token)
|
||||||
|
|
||||||
|
|
||||||
|
def clean_fragment(content):
|
||||||
|
"""Prepare Wallabag HTML for EPUB while preserving formatting where possible.
|
||||||
|
|
||||||
|
Wallabag already extracts readable article HTML, so keep headings, lists,
|
||||||
|
blockquotes, tables, links, inline styles/classes, and images. Remove only
|
||||||
|
active/interactive content and make common void tags XML-compatible.
|
||||||
|
"""
|
||||||
|
if not content:
|
||||||
|
return "<p></p>"
|
||||||
|
content = re.sub(r"<script\b[^>]*>.*?</script>", "", content, flags=re.I | re.S)
|
||||||
|
content = re.sub(r"<iframe\b[^>]*>.*?</iframe>", "", content, flags=re.I | re.S)
|
||||||
|
content = re.sub(r"<form\b[^>]*>.*?</form>", "", content, flags=re.I | re.S)
|
||||||
|
content = re.sub(r"\s(on\w+)=([\"']).*?\2", "", content, flags=re.I | re.S)
|
||||||
|
# Many sites lazy-load images and Wallabag can keep the real URL in data-*
|
||||||
|
# while src is empty/a placeholder. Promote those before dropping extras.
|
||||||
|
content = promote_lazy_image_srcs(content)
|
||||||
|
content = re.sub(r"\s(srcset|sizes)=([\"']).*?\2", "", content, flags=re.I | re.S)
|
||||||
|
content = html.unescape(content)
|
||||||
|
content = re.sub(r"&(?!amp;|lt;|gt;|quot;|apos;|#\d+;|#x[0-9A-Fa-f]+;)", "&", content)
|
||||||
|
content = re.sub(r"<(br|hr|img|meta|link|input)(\b[^>]*?)(?<!/)>", r"<\1\2 />", content, flags=re.I)
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def promote_lazy_image_srcs(content):
|
||||||
|
"""Use lazy-loader attributes as img src when src is missing/a placeholder."""
|
||||||
|
def repl(match):
|
||||||
|
tag = match.group(0)
|
||||||
|
attrs = dict((m.group(1).lower(), m.group(3)) for m in re.finditer(r"\s([\w:-]+)=([\"'])(.*?)\2", tag, flags=re.S))
|
||||||
|
src = (attrs.get("src") or "").strip()
|
||||||
|
lazy = None
|
||||||
|
for name in ("data-src", "data-original", "data-lazy-src", "data-url", "data-full-url"):
|
||||||
|
if attrs.get(name):
|
||||||
|
lazy = attrs[name].strip()
|
||||||
|
break
|
||||||
|
if not lazy:
|
||||||
|
return tag
|
||||||
|
if not src or src.startswith("data:") or "placeholder" in src.lower() or src in ("#", "/"):
|
||||||
|
if " src=" in tag.lower():
|
||||||
|
return re.sub(r"\ssrc=([\"']).*?\1", f' src="{lazy}"', tag, count=1, flags=re.I | re.S)
|
||||||
|
return tag[:-1] + f' src="{lazy}">'
|
||||||
|
return tag
|
||||||
|
return re.sub(r"<img\b[^>]*>", repl, content, flags=re.I | re.S)
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_image(url, referer=None):
|
||||||
|
headers = {
|
||||||
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124 Safari/537.36",
|
||||||
|
# Prefer Kindle-friendly formats. Some CDNs serve AVIF/WebP when asked,
|
||||||
|
# which Amazon's EPUB conversion may drop.
|
||||||
|
"Accept": "image/jpeg,image/png,image/gif,image/svg+xml,image/*;q=0.8,*/*;q=0.5",
|
||||||
|
}
|
||||||
|
if referer:
|
||||||
|
headers["Referer"] = referer
|
||||||
|
req = urllib.request.Request(url, headers=headers)
|
||||||
|
with urllib.request.urlopen(req, timeout=30) as r:
|
||||||
|
data = r.read(20 * 1024 * 1024 + 1)
|
||||||
|
if len(data) > 20 * 1024 * 1024:
|
||||||
|
raise ValueError("image is larger than 20 MiB")
|
||||||
|
ctype = (r.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower()
|
||||||
|
return data, guess_image_type(url, data, ctype)
|
||||||
|
|
||||||
|
|
||||||
|
def guess_image_type(url, data, ctype):
|
||||||
|
if ctype.startswith("image/"):
|
||||||
|
media_type = ctype
|
||||||
|
elif data.startswith(b"\xff\xd8\xff"):
|
||||||
|
media_type = "image/jpeg"
|
||||||
|
elif data.startswith(b"\x89PNG\r\n\x1a\n"):
|
||||||
|
media_type = "image/png"
|
||||||
|
elif data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
|
||||||
|
media_type = "image/gif"
|
||||||
|
elif data.startswith(b"RIFF") and data[8:12] == b"WEBP":
|
||||||
|
media_type = "image/webp"
|
||||||
|
elif b"<svg" in data[:500].lower():
|
||||||
|
media_type = "image/svg+xml"
|
||||||
|
else:
|
||||||
|
media_type = mimetypes.guess_type(urllib.parse.urlparse(url).path)[0] or "application/octet-stream"
|
||||||
|
ext = mimetypes.guess_extension(media_type) or ""
|
||||||
|
if ext == ".jpe":
|
||||||
|
ext = ".jpg"
|
||||||
|
elif ext == ".svgz":
|
||||||
|
ext = ".svg"
|
||||||
|
return media_type, ext or ".img"
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_img_tag(tag):
|
||||||
|
"""Keep Kindle/EPUB-friendly image attributes only."""
|
||||||
|
attrs = []
|
||||||
|
for m in re.finditer(r"\s([\w:-]+)=([\"'])(.*?)\2", tag, flags=re.S):
|
||||||
|
name = m.group(1).lower()
|
||||||
|
value = html.unescape(m.group(3))
|
||||||
|
if name in {"src", "alt", "title", "class", "width", "height"}:
|
||||||
|
attrs.append((name, value))
|
||||||
|
if not any(name == "alt" for name, _ in attrs):
|
||||||
|
attrs.append(("alt", "image"))
|
||||||
|
return "<img" + "".join(f' {name}="{html.escape(value, quote=True)}"' for name, value in attrs) + " />"
|
||||||
|
|
||||||
|
|
||||||
|
def embed_images(content, base_url):
|
||||||
|
"""Download <img> sources, rewrite them to local EPUB paths, return manifest items."""
|
||||||
|
images = []
|
||||||
|
by_src = {}
|
||||||
|
|
||||||
|
def repl(match):
|
||||||
|
tag, quote, src = match.group(0), match.group(1), html.unescape(match.group(2)).strip()
|
||||||
|
if not src or src.startswith(("data:", "cid:")):
|
||||||
|
return tag
|
||||||
|
abs_url = urllib.parse.urljoin(base_url, src)
|
||||||
|
parsed = urllib.parse.urlparse(abs_url)
|
||||||
|
if parsed.scheme not in ("http", "https"):
|
||||||
|
return tag
|
||||||
|
if abs_url not in by_src:
|
||||||
|
try:
|
||||||
|
data, (media_type, ext) = fetch_image(abs_url, referer=base_url)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: could not download image {abs_url}: {e}")
|
||||||
|
return tag
|
||||||
|
item_id = f"img{len(images) + 1}"
|
||||||
|
href = f"images/{item_id}{ext}"
|
||||||
|
by_src[abs_url] = href
|
||||||
|
images.append({"id": item_id, "href": href, "media_type": media_type, "data": data})
|
||||||
|
new_src = by_src[abs_url]
|
||||||
|
tag = re.sub(r"\ssrc=([\"']).*?\1", f' src="{new_src}"', tag, count=1, flags=re.I | re.S)
|
||||||
|
return sanitize_img_tag(tag)
|
||||||
|
|
||||||
|
content = re.sub(r"<img\b[^>]*\ssrc=([\"'])(.*?)\1[^>]*>", repl, content, flags=re.I | re.S)
|
||||||
|
return content, images
|
||||||
|
|
||||||
|
|
||||||
|
def safe_name(s):
|
||||||
|
s = re.sub(r"[^A-Za-z0-9_.-]+", "_", s).strip("_")
|
||||||
|
return s[:80] or "article"
|
||||||
|
|
||||||
|
|
||||||
|
def display_title(s):
|
||||||
|
"""Human-friendly Wallabag article title for EPUB metadata/display.
|
||||||
|
|
||||||
|
Filenames are sanitized separately with safe_name(); this function must not
|
||||||
|
use the filename. Some older/generated titles may contain underscores, so
|
||||||
|
turn those back into spaces for Kindle display.
|
||||||
|
"""
|
||||||
|
s = html.unescape(s or "Wallabag article")
|
||||||
|
s = s.replace("_", " ")
|
||||||
|
s = re.sub(r"\s+", " ", s).strip()
|
||||||
|
return s or "Wallabag article"
|
||||||
|
|
||||||
|
|
||||||
|
def metadata_title(s):
|
||||||
|
"""Title string intended for EPUB metadata, not filename."""
|
||||||
|
return display_title(s).replace(".epub", "").strip()
|
||||||
|
|
||||||
|
|
||||||
|
def build_epub(entry, out_path: Path, title: str | None = None):
|
||||||
|
"""Build a single-article EPUB."""
|
||||||
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
book_id = f"urn:uuid:{uuid.uuid4()}"
|
||||||
|
now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
etitle = metadata_title(title or entry.get("title") or "Wallabag article")
|
||||||
|
url = entry.get("url") or ""
|
||||||
|
domain = entry.get("domain_name") or urllib.parse.urlparse(url).netloc or "Wallabag"
|
||||||
|
published = entry.get("published_at") or entry.get("created_at") or ""
|
||||||
|
raw_content = entry.get("content") or "<p></p>"
|
||||||
|
preview = entry.get("preview_picture") or ""
|
||||||
|
if preview and preview not in raw_content:
|
||||||
|
raw_content = f'<figure><img src="{html.escape(preview, quote=True)}" alt="{html.escape(etitle, quote=True)}" /></figure>\n' + raw_content
|
||||||
|
content = clean_fragment(raw_content)
|
||||||
|
content, images = embed_images(content, url)
|
||||||
|
|
||||||
|
chapter = f'''<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<html xmlns="http://www.w3.org/1999/xhtml" lang="en">
|
||||||
|
<head>
|
||||||
|
<title>{escape(etitle)}</title>
|
||||||
|
<link rel="stylesheet" type="text/css" href="style.css" />
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<article>
|
||||||
|
<header>
|
||||||
|
<h1>{escape(etitle)}</h1>
|
||||||
|
<p class="source">{escape(domain)}{(' · ' + escape(published[:10])) if published else ''}</p>
|
||||||
|
<p class="source"><a href="{escape(url)}">{escape(url)}</a></p>
|
||||||
|
</header>
|
||||||
|
<section class="content">
|
||||||
|
{content}
|
||||||
|
</section>
|
||||||
|
</article>
|
||||||
|
</body></html>'''
|
||||||
|
|
||||||
|
container = '''<?xml version="1.0"?>
|
||||||
|
<container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container">
|
||||||
|
<rootfiles><rootfile full-path="OEBPS/content.opf" media-type="application/oebps-package+xml"/></rootfiles>
|
||||||
|
</container>'''
|
||||||
|
image_manifest = "".join(
|
||||||
|
f' <item id="{img["id"]}" href="{escape(img["href"])}" media-type="{escape(img["media_type"])}"/>\n'
|
||||||
|
for img in images
|
||||||
|
)
|
||||||
|
opf = f'''<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<package xmlns="http://www.idpf.org/2007/opf" xmlns:dc="http://purl.org/dc/elements/1.1/" unique-identifier="BookId" version="3.0">
|
||||||
|
<metadata>
|
||||||
|
<dc:identifier id="BookId">{book_id}</dc:identifier>
|
||||||
|
<dc:title id="title">{escape(etitle)}</dc:title>
|
||||||
|
<meta refines="#title" property="title-type">main</meta>
|
||||||
|
<meta name="calibre:title_sort" content="{escape(etitle)}"/>
|
||||||
|
<dc:language>en</dc:language>
|
||||||
|
<dc:creator id="creator">{escape(domain)}</dc:creator>
|
||||||
|
<meta refines="#creator" property="role" scheme="marc:relators">aut</meta>
|
||||||
|
<dc:publisher>Wallabag</dc:publisher>
|
||||||
|
<dc:source>{escape(url)}</dc:source>
|
||||||
|
<meta property="dcterms:modified">{now}</meta>
|
||||||
|
</metadata>
|
||||||
|
<manifest>
|
||||||
|
<item id="nav" href="nav.xhtml" media-type="application/xhtml+xml" properties="nav"/>
|
||||||
|
<item id="css" href="style.css" media-type="text/css"/>
|
||||||
|
<item id="article" href="article.xhtml" media-type="application/xhtml+xml"/>
|
||||||
|
{image_manifest} </manifest>
|
||||||
|
<spine><itemref idref="article"/></spine>
|
||||||
|
</package>'''
|
||||||
|
nav = f'''<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<html xmlns="http://www.w3.org/1999/xhtml" xmlns:epub="http://www.idpf.org/2007/ops">
|
||||||
|
<head><title>{escape(etitle)}</title></head>
|
||||||
|
<body><nav epub:type="toc"><h1>{escape(etitle)}</h1><ol><li><a href="article.xhtml">Article</a></li></ol></nav></body></html>'''
|
||||||
|
css = """body{font-family:serif;line-height:1.45;margin:0;padding:1em;} article{max-width:42em;} h1{line-height:1.15;} img,video{max-width:100%;height:auto;} figure{margin:1em 0;} figcaption,.source{font-size:.85em;color:#666;} blockquote{border-left:3px solid #aaa;margin-left:.5em;padding-left:1em;color:#333;} pre,code{font-family:monospace;white-space:pre-wrap;} table{border-collapse:collapse;max-width:100%;} td,th{border:1px solid #ccc;padding:.25em;}"""
|
||||||
|
|
||||||
|
with zipfile.ZipFile(out_path, "w") as z:
|
||||||
|
z.writestr("mimetype", "application/epub+zip", compress_type=zipfile.ZIP_STORED)
|
||||||
|
z.writestr("META-INF/container.xml", container)
|
||||||
|
z.writestr("OEBPS/content.opf", opf)
|
||||||
|
z.writestr("OEBPS/nav.xhtml", nav)
|
||||||
|
z.writestr("OEBPS/style.css", css)
|
||||||
|
z.writestr("OEBPS/article.xhtml", chapter)
|
||||||
|
for img in images:
|
||||||
|
z.writestr("OEBPS/" + img["href"], img["data"])
|
||||||
|
print(f"Embedded {len(images)} image(s) in {out_path}")
|
||||||
|
return out_path
|
||||||
|
|
||||||
|
|
||||||
|
def article_key(entry) -> str:
|
||||||
|
if entry.get("id") is not None:
|
||||||
|
return f"id:{entry['id']}"
|
||||||
|
if entry.get("url"):
|
||||||
|
return "url:" + entry["url"]
|
||||||
|
return "title:" + display_title(entry.get("title") or "")
|
||||||
|
|
||||||
|
|
||||||
|
def load_downloaded(path: Path) -> dict:
|
||||||
|
if not path.is_file():
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text())
|
||||||
|
return data if isinstance(data, dict) else {}
|
||||||
|
except Exception:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def save_downloaded(path: Path, data: dict):
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||||
|
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False, sort_keys=True) + "\n")
|
||||||
|
tmp.replace(path)
|
||||||
|
|
||||||
|
|
||||||
|
def remember_downloaded(db: dict, entry, out: Path):
|
||||||
|
key = article_key(entry)
|
||||||
|
db[key] = {
|
||||||
|
"id": entry.get("id"),
|
||||||
|
"title": display_title(entry.get("title") or ""),
|
||||||
|
"url": entry.get("url"),
|
||||||
|
"epub": str(out),
|
||||||
|
"downloaded_at": datetime.now().isoformat(timespec="seconds"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def article_output_path(entry, out_dir: Path) -> Path:
|
||||||
|
title = entry.get("title") or f"wallabag-{entry.get('id', int(time.time()))}"
|
||||||
|
suffix = entry.get("id") or int(time.time())
|
||||||
|
return out_dir / f"{safe_name(title)}-{suffix}.epub"
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser(description="Fetch Wallabag articles and build one EPUB per article")
|
||||||
|
ap.add_argument("--config", default=str(DEFAULT_CONFIG))
|
||||||
|
ap.add_argument("--limit", type=int, default=10)
|
||||||
|
ap.add_argument("--all", action="store_true", help="include archived/read articles too")
|
||||||
|
ap.add_argument("--starred", action="store_true", help="only starred articles")
|
||||||
|
ap.add_argument("--title", default=None, help="title override only when exporting one article")
|
||||||
|
ap.add_argument("--output", default=None, help="output directory, or .epub file if --limit 1")
|
||||||
|
ap.add_argument("--archive", action="store_true", help="mark fetched articles archived after successful build")
|
||||||
|
ap.add_argument("--db", default=str(DEFAULT_DB), help=f"download evidence DB, default: {DEFAULT_DB}")
|
||||||
|
ap.add_argument("--redownload", action="store_true", help="ignore evidence DB and download articles again")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
cfg = load_config(Path(args.config).expanduser())
|
||||||
|
missing = [k for k in ["WALLABAG_URL", "CLIENT_ID", "CLIENT_SECRET", "USERNAME", "PASSWORD"] if not cfg.get(k)]
|
||||||
|
if missing:
|
||||||
|
raise SystemExit("Missing in wallabag.conf: " + ", ".join(missing))
|
||||||
|
|
||||||
|
token = wallabag_token(cfg)
|
||||||
|
entries = fetch_entries(cfg, token, limit=args.limit, unread=not args.all, starred=args.starred)
|
||||||
|
if not entries:
|
||||||
|
raise SystemExit("No articles found.")
|
||||||
|
|
||||||
|
db_path = Path(args.db).expanduser()
|
||||||
|
downloaded = load_downloaded(db_path)
|
||||||
|
original_count = len(entries)
|
||||||
|
if not args.redownload:
|
||||||
|
entries = [e for e in entries if article_key(e) not in downloaded]
|
||||||
|
skipped = original_count - len(entries)
|
||||||
|
if skipped:
|
||||||
|
print(f"Skipped {skipped} already downloaded article(s). Use --redownload to fetch again.")
|
||||||
|
if not entries:
|
||||||
|
raise SystemExit("No new articles to download.")
|
||||||
|
|
||||||
|
output_arg = Path(args.output).expanduser() if args.output else DEFAULT_OUT
|
||||||
|
out_dir = output_arg if output_arg.suffix.lower() != ".epub" or len(entries) > 1 else output_arg.parent
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
created = []
|
||||||
|
for i, entry in enumerate(entries, 1):
|
||||||
|
out = output_arg if len(entries) == 1 and output_arg.suffix.lower() == ".epub" else article_output_path(entry, out_dir)
|
||||||
|
title = args.title if len(entries) == 1 and args.title else (entry.get("title") or f"Wallabag article {i}")
|
||||||
|
build_epub(entry, out, title)
|
||||||
|
remember_downloaded(downloaded, entry, out)
|
||||||
|
save_downloaded(db_path, downloaded)
|
||||||
|
created.append((entry, out))
|
||||||
|
print(f"Created: {out}")
|
||||||
|
|
||||||
|
if args.archive:
|
||||||
|
for entry, _ in created:
|
||||||
|
if entry.get("id") is not None:
|
||||||
|
mark_archived(cfg, token, entry["id"])
|
||||||
|
print(f"Archived {len(created)} articles in Wallabag.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue