1087 lines
44 KiB
Python
Executable file
1087 lines
44 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Home Assistant observer
|
|
|
|
Modes:
|
|
collect - run every 30 minutes; stores a compact JSON snapshot locally
|
|
analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page
|
|
|
|
Configuration is via environment variables. See .env.example.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timedelta, timezone
|
|
from email.utils import format_datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import requests
|
|
|
|
|
|
HA_URL = os.environ.get("HA_URL", "").rstrip("/")
|
|
HA_TOKEN = os.environ.get("HA_TOKEN", "")
|
|
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
|
|
REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports"))
|
|
WEB_DIR = Path(os.environ.get("WEB_DIR", "./web"))
|
|
SITE_BASE_PATH = os.environ.get("SITE_BASE_PATH", "/").strip() or "/"
|
|
SITE_URL = os.environ.get("SITE_URL", "http://localhost").rstrip("/")
|
|
PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md"))
|
|
HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24"))
|
|
MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20"))
|
|
CALENDAR_LOOKAHEAD_DAYS = int(os.environ.get("CALENDAR_LOOKAHEAD_DAYS", "7"))
|
|
MAX_CALENDAR_EVENTS_PER_CALENDAR = int(os.environ.get("MAX_CALENDAR_EVENTS_PER_CALENDAR", "8"))
|
|
ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24"))
|
|
ARTICLE_CONTEXT_DAYS = int(os.environ.get("ARTICLE_CONTEXT_DAYS", "7"))
|
|
MAX_ANALYZE_CHARS = int(os.environ.get("MAX_ANALYZE_CHARS", "80000"))
|
|
DISPLAY_TIMEZONE = os.environ.get("DISPLAY_TIMEZONE", "Europe/Copenhagen")
|
|
KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14"))
|
|
|
|
# LLM_MODE: none | pi | ollama | openai
|
|
LLM_MODE = os.environ.get("LLM_MODE", "none").lower()
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/")
|
|
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
|
|
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
|
|
PI_BIN = os.environ.get("PI_BIN", "pi")
|
|
PI_MODEL = os.environ.get("PI_MODEL", "")
|
|
PI_TIMEOUT = int(os.environ.get("PI_TIMEOUT", "600"))
|
|
|
|
RELEVANT_DOMAINS = set(
|
|
x.strip()
|
|
for x in os.environ.get(
|
|
"RELEVANT_DOMAINS",
|
|
"sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather",
|
|
).split(",")
|
|
if x.strip()
|
|
)
|
|
EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip())
|
|
|
|
ALLOWED_ATTRIBUTES = {
|
|
"friendly_name",
|
|
"unit_of_measurement",
|
|
"device_class",
|
|
"state_class",
|
|
"current_temperature",
|
|
"temperature",
|
|
"humidity",
|
|
"battery_level",
|
|
"brightness",
|
|
"gps_accuracy",
|
|
"source_type",
|
|
"assumed_state",
|
|
}
|
|
|
|
IMPORTANT_ENTITY_KEYWORDS = {
|
|
"alarm": 100,
|
|
"smoke": 100,
|
|
"co_": 100,
|
|
"carbon_monoxide": 100,
|
|
"leak": 95,
|
|
"water": 80,
|
|
"door": 85,
|
|
"window": 80,
|
|
"lock": 85,
|
|
"motion": 70,
|
|
"presence": 70,
|
|
"occupancy": 70,
|
|
"person": 75,
|
|
"device_tracker": 75,
|
|
"phone": 70,
|
|
"laptop": 60,
|
|
"battery": 65,
|
|
"humidity": 60,
|
|
"temperature": 55,
|
|
"climate": 55,
|
|
"heating": 55,
|
|
"dehumidifier": 70,
|
|
"backup": 70,
|
|
"internet": 65,
|
|
"speedtest": 65,
|
|
"router": 60,
|
|
"light": 45,
|
|
"switch": 35,
|
|
"sonos": 45,
|
|
"media": 40,
|
|
"tv": 40,
|
|
"megane": 50,
|
|
"fjr": 50,
|
|
"plant": 45,
|
|
"smb_": 60,
|
|
}
|
|
|
|
|
|
class ConfigError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def require_config(for_ai: bool = False) -> None:
|
|
if not HA_URL:
|
|
raise ConfigError("HA_URL is not set")
|
|
if not HA_TOKEN:
|
|
raise ConfigError("HA_TOKEN is not set")
|
|
if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY:
|
|
raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set")
|
|
|
|
|
|
def ha_get(path: str, params: dict[str, str] | None = None) -> Any:
|
|
headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
|
|
response = requests.get(f"{HA_URL}{path}", headers=headers, params=params, timeout=60)
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.HTTPError as exc:
|
|
detail = response.text.strip()
|
|
raise requests.HTTPError(f"{exc}; response={detail[:500]}", response=response) from exc
|
|
return response.json()
|
|
|
|
|
|
def is_relevant_entity(entity_id: str) -> bool:
|
|
return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS
|
|
|
|
|
|
def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]:
|
|
return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES}
|
|
|
|
|
|
def get_states() -> list[dict[str, Any]]:
|
|
useful: list[dict[str, Any]] = []
|
|
for item in ha_get("/api/states"):
|
|
entity_id = item.get("entity_id", "")
|
|
state = item.get("state")
|
|
if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}:
|
|
continue
|
|
useful.append(
|
|
{
|
|
"entity_id": entity_id,
|
|
"state": state,
|
|
"attributes": compact_attributes(item.get("attributes", {})),
|
|
"last_changed": item.get("last_changed"),
|
|
"last_updated": item.get("last_updated"),
|
|
}
|
|
)
|
|
return sorted(useful, key=lambda x: x["entity_id"])
|
|
|
|
|
|
def clean_text(value: Any, max_len: int = 300) -> str:
|
|
if not value:
|
|
return ""
|
|
text = re.sub(r"<[^>]+>", " ", str(value))
|
|
text = re.sub(r"\s+", " ", html.unescape(text)).strip()
|
|
return text[:max_len]
|
|
|
|
|
|
def human_date_label(dt: datetime, include_time: bool) -> str:
|
|
today = datetime.now(ZoneInfo(DISPLAY_TIMEZONE)).date()
|
|
event_date = dt.date()
|
|
delta_days = (event_date - today).days
|
|
if delta_days == 0:
|
|
day = "today"
|
|
elif delta_days == 1:
|
|
day = "tomorrow"
|
|
elif 1 < delta_days <= 7:
|
|
day = f"upcoming {dt.strftime('%A')}"
|
|
elif -7 <= delta_days < 0:
|
|
day = f"last {dt.strftime('%A')}"
|
|
else:
|
|
day = dt.strftime("%A")
|
|
if include_time:
|
|
return f"{day} at {dt.strftime('%H:%M')}"
|
|
return day
|
|
|
|
|
|
def event_time(value: dict[str, str] | None) -> str:
|
|
if not value:
|
|
return ""
|
|
if "dateTime" in value:
|
|
try:
|
|
dt = datetime.fromisoformat(value["dateTime"].replace("Z", "+00:00"))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return human_date_label(dt.astimezone(ZoneInfo(DISPLAY_TIMEZONE)), include_time=True)
|
|
except Exception:
|
|
return display_time(value.get("dateTime"))
|
|
if "date" in value:
|
|
try:
|
|
dt = datetime.fromisoformat(value["date"]).replace(tzinfo=ZoneInfo(DISPLAY_TIMEZONE))
|
|
return human_date_label(dt, include_time=False)
|
|
except Exception:
|
|
return value.get("date", "")
|
|
return ""
|
|
|
|
|
|
def get_calendar_events(calendar_entity_ids: list[str]) -> list[dict[str, Any]]:
|
|
if not calendar_entity_ids or CALENDAR_LOOKAHEAD_DAYS <= 0:
|
|
return []
|
|
start = datetime.now(timezone.utc)
|
|
end = start + timedelta(days=CALENDAR_LOOKAHEAD_DAYS)
|
|
calendars: list[dict[str, Any]] = []
|
|
for entity_id in calendar_entity_ids:
|
|
try:
|
|
events = ha_get(
|
|
f"/api/calendars/{entity_id}",
|
|
params={"start": start.isoformat(), "end": end.isoformat()},
|
|
)
|
|
except Exception as exc:
|
|
print(f"Skipping calendar events for {entity_id}: {exc}", file=sys.stderr)
|
|
continue
|
|
compact_events = []
|
|
for event in events[:MAX_CALENDAR_EVENTS_PER_CALENDAR]:
|
|
compact_events.append(
|
|
{
|
|
"summary": clean_text(event.get("summary"), 160),
|
|
"start": event_time(event.get("start")),
|
|
"end": event_time(event.get("end")),
|
|
"location": clean_text(event.get("location"), 180),
|
|
"description": clean_text(event.get("description"), 260),
|
|
}
|
|
)
|
|
if compact_events:
|
|
calendars.append({"entity_id": entity_id, "events": compact_events})
|
|
return calendars
|
|
|
|
|
|
def get_history(hours: int, entity_ids: list[str]) -> list[dict[str, Any]]:
|
|
start = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
changes: list[dict[str, Any]] = []
|
|
|
|
# Recent Home Assistant versions/configurations require filter_entity_id for
|
|
# the history endpoint. Query in chunks to avoid an overlong URL.
|
|
chunk_size = 50
|
|
for i in range(0, len(entity_ids), chunk_size):
|
|
chunk = entity_ids[i : i + chunk_size]
|
|
data = ha_get(
|
|
f"/api/history/period/{start.isoformat(timespec='seconds')}",
|
|
params={"filter_entity_id": ",".join(chunk), "minimal_response": ""},
|
|
)
|
|
|
|
for entity_history in data:
|
|
if not entity_history:
|
|
continue
|
|
entity_id = entity_history[0].get("entity_id", "")
|
|
if not is_relevant_entity(entity_id):
|
|
continue
|
|
compact = []
|
|
for item in entity_history[-MAX_HISTORY_PER_ENTITY:]:
|
|
state = item.get("state")
|
|
if state in {"unknown", "unavailable", None}:
|
|
continue
|
|
compact.append({"state": state, "last_changed": item.get("last_changed")})
|
|
if len(set(x["state"] for x in compact)) > 1:
|
|
changes.append({"entity_id": entity_id, "recent_states": compact})
|
|
|
|
return sorted(changes, key=lambda x: x["entity_id"])
|
|
|
|
|
|
def make_snapshot() -> dict[str, Any]:
|
|
states = get_states()
|
|
entity_ids = [state["entity_id"] for state in states]
|
|
calendar_entity_ids = [entity_id for entity_id in entity_ids if entity_id.startswith("calendar.")]
|
|
return {
|
|
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
|
"history_hours": HISTORY_HOURS,
|
|
"calendar_lookahead_days": CALENDAR_LOOKAHEAD_DAYS,
|
|
"states": states,
|
|
"history": get_history(HISTORY_HOURS, entity_ids),
|
|
"calendar_events": get_calendar_events(calendar_entity_ids),
|
|
}
|
|
|
|
|
|
def save_snapshot(snapshot: dict[str, Any]) -> Path:
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
path = DATA_DIR / f"snapshot-{stamp}.json"
|
|
path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def cleanup_old_snapshots() -> None:
|
|
cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS)
|
|
for path in DATA_DIR.glob("snapshot-*.json"):
|
|
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
|
|
path.unlink(missing_ok=True)
|
|
|
|
|
|
def load_recent_snapshots(hours: int) -> list[dict[str, Any]]:
|
|
cutoff = datetime.now() - timedelta(hours=hours)
|
|
snapshots = []
|
|
for path in sorted(DATA_DIR.glob("snapshot-*.json")):
|
|
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
|
|
continue
|
|
try:
|
|
snapshots.append(json.loads(path.read_text(encoding="utf-8")))
|
|
except Exception as exc:
|
|
print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr)
|
|
return snapshots
|
|
|
|
|
|
def display_time(value: str | None) -> str:
|
|
if not value:
|
|
return ""
|
|
try:
|
|
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
local = dt.astimezone(ZoneInfo(DISPLAY_TIMEZONE))
|
|
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
except Exception:
|
|
return value
|
|
|
|
|
|
def entity_importance(entity_id: str, attrs: dict[str, Any] | None = None) -> int:
|
|
attrs = attrs or {}
|
|
domain = entity_id.split(".", 1)[0]
|
|
text = f"{entity_id} {attrs.get('friendly_name', '')} {attrs.get('device_class', '')}".lower()
|
|
score = 0
|
|
|
|
domain_scores = {
|
|
"alarm_control_panel": 100,
|
|
"lock": 90,
|
|
"person": 80,
|
|
"device_tracker": 75,
|
|
"binary_sensor": 60,
|
|
"climate": 55,
|
|
"cover": 50,
|
|
"sensor": 45,
|
|
"light": 35,
|
|
"switch": 30,
|
|
"media_player": 25,
|
|
}
|
|
score += domain_scores.get(domain, 10)
|
|
|
|
for keyword, points in IMPORTANT_ENTITY_KEYWORDS.items():
|
|
if keyword in text:
|
|
score += points
|
|
|
|
# Sønderborg/Denmark home is the primary residence and absolute priority.
|
|
# Samobor/Croatia entities use the smb_ prefix and are still included, but
|
|
# they should lose ties when the LLM input has to be size-limited.
|
|
if "smb_" in entity_id.lower():
|
|
score -= 40
|
|
else:
|
|
score += 120
|
|
|
|
state = str(attrs.get("state", "")).lower()
|
|
if state in {"on", "open", "unlocked", "detected", "home"}:
|
|
score += 15
|
|
return score
|
|
|
|
|
|
def summarize_snapshot(snapshot: dict[str, Any]) -> str:
|
|
lines = [
|
|
f"Snapshot: {display_time(snapshot.get('generated_at'))}",
|
|
"Priority current states first; lower-priority entities follow only if the LLM size limit allows.",
|
|
"Current states:",
|
|
]
|
|
states = sorted(
|
|
snapshot.get("states", []),
|
|
key=lambda state: (-entity_importance(state.get("entity_id", ""), state.get("attributes", {})), state.get("entity_id", "")),
|
|
)
|
|
for state in states:
|
|
attrs = state.get("attributes", {})
|
|
name = attrs.get("friendly_name", state.get("entity_id"))
|
|
unit = attrs.get("unit_of_measurement", "")
|
|
value = f"{state.get('state')} {unit}".strip()
|
|
score = entity_importance(state.get("entity_id", ""), attrs)
|
|
lines.append(f"- importance={score} {name} ({state.get('entity_id')}): {value}; last_changed={display_time(state.get('last_changed'))}")
|
|
lines.append("Upcoming calendar events:")
|
|
for calendar in snapshot.get("calendar_events", []):
|
|
lines.append(f"- {calendar.get('entity_id')}:")
|
|
for event in calendar.get("events", []):
|
|
details = []
|
|
if event.get("location"):
|
|
details.append(f"location={event.get('location')}")
|
|
if event.get("description"):
|
|
details.append(f"description={event.get('description')}")
|
|
detail_text = f"; {'; '.join(details)}" if details else ""
|
|
lines.append(f" - {event.get('start')} to {event.get('end')}: {event.get('summary')}{detail_text}")
|
|
lines.append("Recently changed entities:")
|
|
history = sorted(
|
|
snapshot.get("history", []),
|
|
key=lambda item: (-entity_importance(item.get("entity_id", "")), item.get("entity_id", "")),
|
|
)
|
|
for item in history:
|
|
transitions = ", ".join(f"{x.get('state')} @ {display_time(x.get('last_changed'))}" for x in item.get("recent_states", [])[-8:])
|
|
score = entity_importance(item.get("entity_id", ""))
|
|
lines.append(f"- importance={score} {item.get('entity_id')}: {transitions}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def build_daily_summary(snapshots: list[dict[str, Any]]) -> str:
|
|
parts = [
|
|
f"Daily Home Assistant bundle generated {datetime.now(ZoneInfo(DISPLAY_TIMEZONE)).isoformat(timespec='seconds')}",
|
|
f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.",
|
|
f"Input capped at roughly {MAX_ANALYZE_CHARS} characters for the LLM.",
|
|
f"All times in this bundle are converted to {DISPLAY_TIMEZONE} local time.",
|
|
]
|
|
total = len("\n".join(parts))
|
|
included = 0
|
|
for snapshot in reversed(snapshots):
|
|
block = "\n---\n" + summarize_snapshot(snapshot)
|
|
if total + len(block) > MAX_ANALYZE_CHARS and included > 0:
|
|
break
|
|
if len(block) > MAX_ANALYZE_CHARS:
|
|
block = block[:MAX_ANALYZE_CHARS] + "\n[Snapshot truncated for LLM size limit]"
|
|
parts.append(block)
|
|
total += len(block)
|
|
included += 1
|
|
parts.insert(2, f"Included {included} most recent snapshots after size limiting.")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def read_extra_llm_instructions() -> str:
|
|
if not PROMPT_FILE.exists():
|
|
return ""
|
|
return PROMPT_FILE.read_text(encoding="utf-8").strip()
|
|
|
|
|
|
def load_recent_article_context(days: int) -> str:
|
|
if days <= 0 or not REPORT_DIR.exists():
|
|
return ""
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
articles: list[str] = []
|
|
for path in sorted(REPORT_DIR.glob("daily-ai-analysis-*.md")):
|
|
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
|
|
continue
|
|
try:
|
|
text = path.read_text(encoding="utf-8")
|
|
except Exception as exc:
|
|
print(f"Skipping unreadable previous report {path}: {exc}", file=sys.stderr)
|
|
continue
|
|
conclusions = text.split("\n## Data bundle\n", 1)[0].strip()
|
|
articles.append(f"PREVIOUS ARTICLE {path.name}:\n{conclusions[:8000]}")
|
|
return "\n\n---\n\n".join(articles[-7:])
|
|
|
|
|
|
def analysis_prompt(input_summary: str, previous_articles: str = "") -> str:
|
|
extra_instructions = read_extra_llm_instructions()
|
|
extra_block = ""
|
|
if extra_instructions:
|
|
extra_block = f"""
|
|
|
|
ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}:
|
|
{extra_instructions}
|
|
"""
|
|
previous_block = ""
|
|
if previous_articles:
|
|
previous_block = f"""
|
|
|
|
PREVIOUS ARTICLES FROM THE LAST {ARTICLE_CONTEXT_DAYS} DAYS FOR CONTEXT:
|
|
Use these only for trend/context awareness. Do not claim something happened today unless today's data supports it.
|
|
{previous_articles}
|
|
"""
|
|
|
|
return f"""You are writing today's Home Assistant smart-home blog article for the owner.
|
|
|
|
Write a funny but useful morning briefing in a clean blog/article style. Use light humor,
|
|
but keep emojis/smileys rare: at most one in the whole article. Prefer clear headings,
|
|
short paragraphs, and readable bullet lists. Remain factual and privacy-aware. Include:
|
|
- A short comedy headline for the day
|
|
- What seemed to happen at home today
|
|
- Behavioral patterns that can reasonably be inferred
|
|
- Notable trends compared with recent previous articles, if supported
|
|
- What a nosy raccoon/hacker could figure out about the resident
|
|
- Anomalies, risks, or privacy/security concerns
|
|
- Suggested Home Assistant automations or fixes
|
|
|
|
Distinguish strong evidence from guesses. Do not invent facts not supported by the data.
|
|
{extra_block}{previous_block}
|
|
TODAY'S DATA:
|
|
{input_summary}
|
|
"""
|
|
|
|
|
|
def call_ollama(prompt: str) -> str:
|
|
response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300)
|
|
response.raise_for_status()
|
|
return response.json().get("response", "").strip()
|
|
|
|
|
|
def call_openai(prompt: str) -> str:
|
|
response = requests.post(
|
|
"https://api.openai.com/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"},
|
|
json={
|
|
"model": OPENAI_MODEL,
|
|
"messages": [
|
|
{"role": "system", "content": "You are a careful but funny smart-home analyst."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"temperature": 0.35,
|
|
},
|
|
timeout=300,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
|
def call_pi(prompt: str) -> str:
|
|
# Avoid piping the prompt on stdin here. In pi print mode, piped stdin can be
|
|
# treated as the primary output/input stream in surprising ways. Passing the
|
|
# prompt as an @file gives reliable non-interactive cron behavior.
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=".md", delete=False) as tmp:
|
|
tmp.write(prompt)
|
|
prompt_path = tmp.name
|
|
try:
|
|
cmd = [PI_BIN, "--no-tools"]
|
|
if PI_MODEL:
|
|
cmd.extend(["--model", PI_MODEL])
|
|
cmd.extend(["-p", f"@{prompt_path}"])
|
|
result = subprocess.run(
|
|
cmd,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=PI_TIMEOUT,
|
|
check=False,
|
|
)
|
|
finally:
|
|
Path(prompt_path).unlink(missing_ok=True)
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.strip()
|
|
raise RuntimeError(f"pi exited with status {result.returncode}: {stderr[-1000:]}")
|
|
output = result.stdout.strip()
|
|
if not output:
|
|
raise RuntimeError("pi returned an empty analysis")
|
|
return output
|
|
|
|
|
|
def get_llm_conclusions(input_summary: str, previous_articles: str = "") -> str:
|
|
if LLM_MODE == "none":
|
|
return "AI analysis disabled. Set LLM_MODE=pi, LLM_MODE=ollama, or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤"
|
|
prompt = analysis_prompt(input_summary, previous_articles)
|
|
if LLM_MODE == "ollama":
|
|
return call_ollama(prompt)
|
|
if LLM_MODE == "openai":
|
|
return call_openai(prompt)
|
|
if LLM_MODE == "pi":
|
|
return call_pi(prompt)
|
|
return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, pi, ollama, or openai."
|
|
|
|
|
|
def remove_most_emoji(text: str) -> str:
|
|
# Keep the writing readable on the blog page even if the model gets a bit too festive.
|
|
return re.sub(r"[\U0001F300-\U0001FAFF\U00002700-\U000027BF\U00002600-\U000026FF]+", "", text)
|
|
|
|
|
|
def inline_markdown(text: str) -> str:
|
|
safe = html.escape(remove_most_emoji(text).strip())
|
|
safe = re.sub(r"\*\*(.*?)\*\*", r"<strong>\1</strong>", safe)
|
|
safe = re.sub(r"`([^`]+)`", r"<code>\1</code>", safe)
|
|
return safe
|
|
|
|
|
|
def move_bottom_line_before_serious(blocks: list[str]) -> list[str]:
|
|
serious_start = None
|
|
bottom_start = None
|
|
bottom_end = None
|
|
|
|
for i, block in enumerate(blocks):
|
|
heading = re.match(r"<h([23])>(.*?)</h\1>$", block, flags=re.DOTALL)
|
|
if not heading:
|
|
continue
|
|
title = re.sub(r"<[^>]+>", "", html.unescape(heading.group(2))).lower()
|
|
if serious_start is None and ("part ii" in title or "serious briefing" in title):
|
|
serious_start = i
|
|
elif serious_start is not None and ("bottom line" in title or "conclusion" in title):
|
|
bottom_start = i
|
|
break
|
|
|
|
if serious_start is None or bottom_start is None:
|
|
return blocks
|
|
|
|
bottom_end = len(blocks)
|
|
for i in range(bottom_start + 1, len(blocks)):
|
|
if re.match(r"<h[23]>.*?</h[23]>$", blocks[i], flags=re.DOTALL):
|
|
bottom_end = i
|
|
break
|
|
|
|
bottom_section = blocks[bottom_start:bottom_end]
|
|
remaining = blocks[:bottom_start] + blocks[bottom_end:]
|
|
return remaining[:serious_start] + bottom_section + remaining[serious_start:]
|
|
|
|
|
|
def collapse_serious_sections(blocks: list[str]) -> list[str]:
|
|
output: list[str] = []
|
|
in_serious = False
|
|
after_bottom_line = False
|
|
current_summary = ""
|
|
current_content: list[str] = []
|
|
|
|
def close_detail() -> None:
|
|
nonlocal current_summary, current_content
|
|
if current_summary:
|
|
content = "\n".join(current_content).strip()
|
|
output.append(f"<details class=\"briefing-section\"><summary>{current_summary}</summary>\n{content}\n</details>")
|
|
current_summary = ""
|
|
current_content = []
|
|
|
|
for block in blocks:
|
|
heading = re.match(r"<h([23])>(.*?)</h\1>$", block, flags=re.DOTALL)
|
|
if heading:
|
|
title = heading.group(2)
|
|
plain_title = re.sub(r"<[^>]+>", "", html.unescape(title)).lower()
|
|
is_bottom_line = "bottom line" in plain_title or "conclusion" in plain_title
|
|
if is_bottom_line:
|
|
close_detail()
|
|
in_serious = False
|
|
after_bottom_line = True
|
|
output.append(block)
|
|
continue
|
|
if not in_serious and ("part ii" in plain_title or "serious briefing" in plain_title):
|
|
in_serious = True
|
|
output.append(block)
|
|
continue
|
|
if in_serious or after_bottom_line:
|
|
in_serious = True
|
|
close_detail()
|
|
current_summary = title
|
|
continue
|
|
if in_serious:
|
|
if current_summary:
|
|
current_content.append(block)
|
|
else:
|
|
output.append(block)
|
|
else:
|
|
output.append(block)
|
|
|
|
close_detail()
|
|
return output
|
|
|
|
|
|
def markdownish_to_html(text: str) -> str:
|
|
blocks: list[str] = []
|
|
paragraph: list[str] = []
|
|
list_items: list[str] = []
|
|
|
|
def flush_paragraph() -> None:
|
|
nonlocal paragraph
|
|
if paragraph:
|
|
blocks.append(f"<p>{inline_markdown(' '.join(paragraph))}</p>")
|
|
paragraph = []
|
|
|
|
def flush_list() -> None:
|
|
nonlocal list_items
|
|
if list_items:
|
|
blocks.append("<ul>" + "".join(f"<li>{item}</li>" for item in list_items) + "</ul>")
|
|
list_items = []
|
|
|
|
for raw_line in text.splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
flush_paragraph()
|
|
flush_list()
|
|
continue
|
|
heading = re.match(r"^(#{1,3})\s+(.+)$", line)
|
|
if heading:
|
|
flush_paragraph()
|
|
flush_list()
|
|
level = min(len(heading.group(1)), 3)
|
|
blocks.append(f"<h{level}>{inline_markdown(heading.group(2))}</h{level}>")
|
|
continue
|
|
bullet = re.match(r"^[-*]\s+(.+)$", line)
|
|
if bullet:
|
|
flush_paragraph()
|
|
list_items.append(inline_markdown(bullet.group(1)))
|
|
continue
|
|
flush_list()
|
|
paragraph.append(line)
|
|
|
|
flush_paragraph()
|
|
flush_list()
|
|
blocks = move_bottom_line_before_serious(blocks)
|
|
return "\n".join(collapse_serious_sections(blocks))
|
|
|
|
|
|
BLOG_CSS = """
|
|
:root { color-scheme: dark; --cyan:#00f5ff; --blue:#2777ff; --violet:#8b5cf6; --amber:#fbbf24; --panel:#07111fcc; --line:#1de7ff66; }
|
|
* { box-sizing:border-box; }
|
|
body {
|
|
margin:0; min-height:100vh; color:#dff9ff; line-height:1.7;
|
|
font-family:Inter,ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;
|
|
background:
|
|
radial-gradient(circle at 16% 10%, #1746ff55 0 12rem, transparent 28rem),
|
|
radial-gradient(circle at 82% 4%, #00f5ff30 0 10rem, transparent 24rem),
|
|
radial-gradient(circle at 50% 100%, #6d28d955 0 15rem, transparent 34rem),
|
|
linear-gradient(135deg,#02040a 0%,#07111f 48%,#030712 100%);
|
|
overflow-x:hidden;
|
|
}
|
|
body::before {
|
|
content:""; position:fixed; inset:0; pointer-events:none; opacity:.34;
|
|
background-image:
|
|
linear-gradient(#00f5ff16 1px, transparent 1px),
|
|
linear-gradient(90deg,#00f5ff16 1px, transparent 1px),
|
|
linear-gradient(115deg, transparent 0 48%, #7dd3fc22 50%, transparent 52% 100%);
|
|
background-size:54px 54px,54px 54px,180px 180px;
|
|
mask-image:linear-gradient(to bottom,#000 0%,#000 55%,transparent 100%);
|
|
}
|
|
header { position:relative; border-bottom:1px solid var(--line); background:linear-gradient(90deg,#020617dd,#051b33bb,#020617dd); box-shadow:0 0 42px #00d9ff22; }
|
|
header::before, header::after { content:""; position:absolute; top:0; bottom:0; width:18vw; border-color:var(--cyan); opacity:.65; pointer-events:none; }
|
|
header::before { left:0; border-top:2px solid; border-left:2px solid; clip-path:polygon(0 0,100% 0,35% 100%,0 100%); }
|
|
header::after { right:0; border-top:2px solid; border-right:2px solid; clip-path:polygon(0 0,100% 0,100% 100%,65% 100%); }
|
|
.wrap { max-width:1180px; margin:0 auto; padding:1.5rem; position:relative; }
|
|
.masthead { padding:3rem 1.5rem 2.6rem; text-align:center; }
|
|
.kicker { color:var(--cyan); text-transform:uppercase; letter-spacing:.28em; font:800 .78rem ui-monospace,SFMono-Regular,Menlo,monospace; text-shadow:0 0 14px #00f5ff; }
|
|
h1 { margin:.35rem 0; font-size:clamp(2.4rem,7vw,6rem); line-height:.9; text-transform:uppercase; letter-spacing:.05em; color:#f8feff; text-shadow:0 0 12px #00f5ff,0 0 38px #2777ff; }
|
|
h2,h3 { color:#c8fbff; line-height:1.2; letter-spacing:.03em; text-shadow:0 0 12px #00f5ff88; }
|
|
article, aside {
|
|
position:relative; background:linear-gradient(180deg,#071827d9,#050914e6); border:1px solid var(--line);
|
|
clip-path:polygon(0 18px,18px 0,100% 0,100% calc(100% - 18px),calc(100% - 18px) 100%,0 100%);
|
|
box-shadow:0 0 0 1px #2777ff22 inset,0 0 34px #00d9ff18,0 24px 60px #000b;
|
|
}
|
|
article::before, aside::before { content:""; position:absolute; inset:0; pointer-events:none; border:1px solid #ffffff12; clip-path:inherit; }
|
|
article { padding:clamp(1.2rem,3vw,2.4rem); }
|
|
article p { margin:0 0 1.05rem; max-width:72ch; }
|
|
article ul { margin:.2rem 0 1.2rem; padding-left:1.35rem; max-width:74ch; }
|
|
article li { margin:.35rem 0; }
|
|
article p, article li { font-size:1.04rem; color:#e6fbff; }
|
|
article h1 { font-size:clamp(1.8rem,4vw,3.5rem); text-align:left; }
|
|
.title-image { display:block; width:100%; height:auto; margin:0 0 1.4rem; border:1px solid #22d3ee66; box-shadow:0 0 28px #00d9ff22; }
|
|
article h2 { margin-top:1.8rem; padding-top:1rem; border-top:1px solid #22d3ee33; }
|
|
article h1 + p, article h2 + p, article h3 + p { margin-top:.3rem; }
|
|
strong { color:#ffffff; font-weight:750; }
|
|
code { color:#fef3c7; background:#020617; border:1px solid #22d3ee33; padding:.08rem .28rem; }
|
|
.layout { display:grid; grid-template-columns:minmax(0,1fr) 310px; gap:1.35rem; align-items:start; }
|
|
aside { padding:1.1rem; position:sticky; top:1rem; }
|
|
.archive { list-style:none; margin:0; padding:0; }
|
|
.archive li { border-bottom:1px solid #22d3ee33; padding:.7rem 0; font-family:ui-monospace,SFMono-Regular,Menlo,monospace; }
|
|
.archive li::before { content:"▸ "; color:var(--cyan); text-shadow:0 0 10px var(--cyan); }
|
|
.archive li:last-child { border-bottom:0; }
|
|
a { color:#67e8f9; text-decoration:none; text-shadow:0 0 9px #00f5ff77; }
|
|
a:hover { color:white; text-decoration:none; filter:drop-shadow(0 0 8px var(--cyan)); }
|
|
.meta { color:#9eeaff; font:.95rem ui-monospace,SFMono-Regular,Menlo,monospace; letter-spacing:.04em; }
|
|
details { margin-top:1.5rem; border-top:1px solid #22d3ee33; padding-top:1rem; }
|
|
details.briefing-section { background:#02061788; border:1px solid #22d3ee33; padding:.75rem 1rem; margin:.8rem 0; }
|
|
details.briefing-section summary { font-size:1.05rem; }
|
|
summary { cursor:pointer; color:var(--amber); text-transform:uppercase; letter-spacing:.08em; }
|
|
pre { white-space:pre-wrap; background:#01040acc; color:#bff8ff; padding:1rem; border:1px solid #22d3ee44; border-radius:0; overflow:auto; font-size:.82rem; box-shadow:0 0 22px #00d9ff11 inset; }
|
|
footer { color:#7dd3fc; text-align:center; padding:2rem; font:.82rem ui-monospace,SFMono-Regular,Menlo,monospace; text-transform:uppercase; letter-spacing:.12em; }
|
|
@media (max-width:850px) { .layout { grid-template-columns:1fr; } aside { position:static; } .masthead { text-align:left; } }
|
|
"""
|
|
|
|
|
|
def site_href(relative_path: str = "") -> str:
|
|
base = SITE_BASE_PATH
|
|
if not base.startswith("/"):
|
|
base = f"/{base}"
|
|
if not base.endswith("/"):
|
|
base = f"{base}/"
|
|
return f"{base}{relative_path.lstrip('/')}"
|
|
|
|
|
|
def site_url(relative_path: str = "") -> str:
|
|
return f"{SITE_URL}{site_href(relative_path)}"
|
|
|
|
|
|
def article_links() -> str:
|
|
articles_dir = WEB_DIR / "articles"
|
|
if not articles_dir.exists():
|
|
return "<li>No articles yet. The raccoon newsroom is warming up.</li>"
|
|
links = []
|
|
for path in sorted(articles_dir.glob("*.html"), reverse=True):
|
|
label = path.stem
|
|
try:
|
|
label = datetime.strptime(path.stem, "%Y-%m-%d").strftime("%A, %B %-d, %Y")
|
|
except ValueError:
|
|
pass
|
|
href = site_href(f"articles/{path.name}")
|
|
links.append(f'<li><a href="{html.escape(href)}">{html.escape(label)}</a></li>')
|
|
return "\n".join(links) or "<li>No articles yet. The raccoon newsroom is warming up.</li>"
|
|
|
|
|
|
def svg_text_lines(text: str, max_chars: int = 28, max_lines: int = 3) -> list[str]:
|
|
words = text.split()
|
|
lines: list[str] = []
|
|
current = ""
|
|
for word in words:
|
|
candidate = f"{current} {word}".strip()
|
|
if len(candidate) <= max_chars:
|
|
current = candidate
|
|
continue
|
|
if current:
|
|
lines.append(current)
|
|
current = word
|
|
if len(lines) == max_lines - 1:
|
|
break
|
|
if current and len(lines) < max_lines:
|
|
lines.append(current)
|
|
if len(lines) == max_lines and len(" ".join(words)) > len(" ".join(lines)):
|
|
lines[-1] = lines[-1].rstrip(".,;: ") + "…"
|
|
return lines or ["Smart Home Briefing"]
|
|
|
|
|
|
def write_title_image(article_name: str, title: str, generated_at: str) -> Path:
|
|
images_dir = WEB_DIR / "images"
|
|
images_dir.mkdir(parents=True, exist_ok=True)
|
|
image_name = article_name.replace(".html", ".svg")
|
|
lines = svg_text_lines(remove_most_emoji(title))
|
|
text_spans = "\n".join(
|
|
f'<text x="80" y="{220 + i * 72}" class="title">{html.escape(line)}</text>'
|
|
for i, line in enumerate(lines)
|
|
)
|
|
svg = f"""<svg xmlns="http://www.w3.org/2000/svg" width="1200" height="630" viewBox="0 0 1200 630" role="img" aria-label="{html.escape(title)}">
|
|
<defs>
|
|
<radialGradient id="g1" cx="20%" cy="15%" r="65%"><stop offset="0" stop-color="#1d4ed8"/><stop offset="0.45" stop-color="#07111f"/><stop offset="1" stop-color="#020617"/></radialGradient>
|
|
<linearGradient id="line" x1="0" x2="1"><stop stop-color="#00f5ff"/><stop offset="1" stop-color="#8b5cf6"/></linearGradient>
|
|
<filter id="glow"><feGaussianBlur stdDeviation="4" result="b"/><feMerge><feMergeNode in="b"/><feMergeNode in="SourceGraphic"/></feMerge></filter>
|
|
<style>
|
|
.kicker {{ font: 700 28px ui-monospace, SFMono-Regular, Menlo, monospace; fill: #67e8f9; letter-spacing: 5px; }}
|
|
.title {{ font: 800 58px system-ui, -apple-system, Segoe UI, sans-serif; fill: #f8feff; filter: url(#glow); }}
|
|
.meta {{ font: 500 24px ui-monospace, SFMono-Regular, Menlo, monospace; fill: #bfdbfe; }}
|
|
</style>
|
|
</defs>
|
|
<rect width="1200" height="630" fill="url(#g1)"/>
|
|
<path d="M0 105h1200M0 210h1200M0 315h1200M0 420h1200M0 525h1200M120 0v630M360 0v630M600 0v630M840 0v630M1080 0v630" stroke="#22d3ee" stroke-opacity=".12"/>
|
|
<path d="M40 40h360M40 40v120M1160 590H800M1160 590V470" stroke="url(#line)" stroke-width="4" fill="none"/>
|
|
<circle cx="940" cy="175" r="96" fill="none" stroke="#00f5ff" stroke-opacity=".75" stroke-width="3"/>
|
|
<circle cx="940" cy="175" r="54" fill="none" stroke="#fbbf24" stroke-opacity=".85" stroke-width="3"/>
|
|
<path d="M812 175h256M940 47v256" stroke="#00f5ff" stroke-opacity=".45" stroke-width="2"/>
|
|
<text x="80" y="105" class="kicker">HOME TELEMETRY DISPATCH</text>
|
|
{text_spans}
|
|
<text x="80" y="550" class="meta">Smart Home Gossip Gazette · {html.escape(generated_at)}</text>
|
|
</svg>
|
|
"""
|
|
path = images_dir / image_name
|
|
path.write_text(svg, encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def write_favicon() -> Path:
|
|
favicon = f"""<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64">
|
|
<defs>
|
|
<radialGradient id="g" cx="50%" cy="45%" r="70%">
|
|
<stop offset="0" stop-color="#67e8f9"/>
|
|
<stop offset="0.45" stop-color="#2777ff"/>
|
|
<stop offset="1" stop-color="#020617"/>
|
|
</radialGradient>
|
|
<filter id="glow"><feGaussianBlur stdDeviation="1.8" result="b"/><feMerge><feMergeNode in="b"/><feMergeNode in="SourceGraphic"/></feMerge></filter>
|
|
</defs>
|
|
<rect width="64" height="64" rx="12" fill="#020617"/>
|
|
<path d="M8 32h48M32 8v48M14 18l36 28M50 18L14 46" stroke="#00f5ff" stroke-width="1.3" opacity=".45"/>
|
|
<circle cx="32" cy="32" r="18" fill="url(#g)" stroke="#9effff" stroke-width="2" filter="url(#glow)"/>
|
|
<circle cx="25" cy="28" r="3" fill="#020617"/>
|
|
<circle cx="39" cy="28" r="3" fill="#020617"/>
|
|
<path d="M23 39c6 4 12 4 18 0" stroke="#020617" stroke-width="3" fill="none" stroke-linecap="round"/>
|
|
<path d="M7 32c10-16 40-16 50 0-10 16-40 16-50 0Z" fill="none" stroke="#fbbf24" stroke-width="2" opacity=".9"/>
|
|
</svg>
|
|
"""
|
|
path = WEB_DIR / "favicon.svg"
|
|
path.write_text(favicon, encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def clean_rss_text(article_html: str) -> tuple[str, str]:
|
|
article_match = re.search(r"<article[^>]*>(.*?)</article>", article_html, flags=re.DOTALL | re.IGNORECASE)
|
|
content = article_match.group(1) if article_match else article_html
|
|
content = re.sub(r"<details.*?</details>", " ", content, flags=re.DOTALL | re.IGNORECASE)
|
|
content = re.sub(r"<p><a [^>]*>Permanent link.*?</a></p>", " ", content, flags=re.DOTALL | re.IGNORECASE)
|
|
title_match = re.search(r"<h1[^>]*>(.*?)</h1>|<h2[^>]*>(.*?)</h2>", content, flags=re.DOTALL | re.IGNORECASE)
|
|
title = "Smart Home Briefing"
|
|
if title_match:
|
|
title = re.sub(r"<[^>]+>", " ", title_match.group(1) or title_match.group(2) or "")
|
|
title = re.sub(r"\s+", " ", html.unescape(title)).strip() or "Smart Home Briefing"
|
|
text = re.sub(r"<br>\s*", "\n", content)
|
|
text = re.sub(r"</(p|li|h1|h2|h3)>", "\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"<[^>]+>", " ", text)
|
|
text = html.unescape(text)
|
|
text = re.sub(r"[`*_#]", "", text)
|
|
text = re.sub(r"^[\s\-•]+", "", text, flags=re.MULTILINE)
|
|
text = re.sub(r"[ \t]+", " ", text)
|
|
text = re.sub(r"\n\s*\n+", "\n\n", text).strip()
|
|
return title, text
|
|
|
|
|
|
def write_rss_feed() -> Path:
|
|
articles_dir = WEB_DIR / "articles"
|
|
items = []
|
|
for path in sorted(articles_dir.glob("*.html"), reverse=True)[:20]:
|
|
fallback_title = path.stem
|
|
try:
|
|
fallback_title = datetime.strptime(path.stem, "%Y-%m-%d").strftime("Smart Home Briefing - %A, %B %-d, %Y")
|
|
except ValueError:
|
|
fallback_title = f"Smart Home Briefing - {path.stem}"
|
|
content = path.read_text(encoding="utf-8", errors="ignore")
|
|
article_title, article_text = clean_rss_text(content)
|
|
title = article_title if article_title != "Smart Home Briefing" else fallback_title
|
|
description = article_text[:600]
|
|
pub_dt = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc)
|
|
url = site_url(f"articles/{path.name}")
|
|
image_path = WEB_DIR / "images" / path.name.replace(".html", ".svg")
|
|
enclosure = ""
|
|
if image_path.exists():
|
|
enclosure = f'\n <enclosure url="{html.escape(site_url(f"images/{image_path.name}"))}" type="image/svg+xml" length="{image_path.stat().st_size}" />'
|
|
items.append(f"""
|
|
<item>
|
|
<title>{html.escape(title)}</title>
|
|
<link>{html.escape(url)}</link>
|
|
<guid isPermaLink="true">{html.escape(url)}</guid>
|
|
<pubDate>{format_datetime(pub_dt, usegmt=True)}</pubDate>
|
|
<description>{html.escape(description)}</description>{enclosure}
|
|
</item>""")
|
|
now = format_datetime(datetime.now(timezone.utc), usegmt=True)
|
|
feed_url = site_url("rss.xml")
|
|
feed = f"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
|
|
<channel>
|
|
<title>Smart Home Gossip Gazette</title>
|
|
<link>{html.escape(site_url())}</link>
|
|
<atom:link href="{html.escape(feed_url)}" rel="self" type="application/rss+xml" />
|
|
<description>Daily Home Assistant smart-home briefings.</description>
|
|
<language>en</language>
|
|
<lastBuildDate>{now}</lastBuildDate>
|
|
{''.join(items)}
|
|
</channel>
|
|
</rss>
|
|
"""
|
|
path = WEB_DIR / "rss.xml"
|
|
path.write_text(feed, encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def blog_shell(title: str, subtitle: str, main_content: str, archive_links: str, image_href: str = "") -> str:
|
|
image_meta = ""
|
|
if image_href:
|
|
image_meta = f'<meta property="og:image" content="{html.escape(site_url(image_href.lstrip("/")))}">\n'
|
|
return f"""<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>{html.escape(title)}</title>
|
|
<link rel="canonical" href="{html.escape(site_url())}">
|
|
{image_meta}<link rel="alternate" type="application/rss+xml" title="Smart Home Gossip Gazette RSS" href="{html.escape(site_url('rss.xml'))}">
|
|
<link rel="icon" href="{html.escape(site_href('favicon.svg'))}" type="image/svg+xml">
|
|
<style>{BLOG_CSS}</style>
|
|
</head>
|
|
<body>
|
|
<header>
|
|
<div class="wrap masthead">
|
|
<div class="kicker">◇ orbital home telemetry // raccoon intelligence unit ◇</div>
|
|
<h1>{html.escape(title)}</h1>
|
|
<p class="meta">{html.escape(subtitle)}</p>
|
|
</div>
|
|
</header>
|
|
<main class="wrap layout">
|
|
<section>{main_content}</section>
|
|
<aside>
|
|
<h2>Transmission archive</h2>
|
|
<p class="meta"><a href="{html.escape(site_href('rss.xml'))}">RSS feed</a></p>
|
|
<ul class="archive">{archive_links}</ul>
|
|
</aside>
|
|
</main>
|
|
<footer>Generated by Home Assistant Observer · Local nginx uplink active</footer>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
def publish_webpage(conclusions: str, raw_summary: str) -> Path:
|
|
WEB_DIR.mkdir(parents=True, exist_ok=True)
|
|
articles_dir = WEB_DIR / "articles"
|
|
articles_dir.mkdir(parents=True, exist_ok=True)
|
|
now_dt = datetime.now()
|
|
now = now_dt.strftime("%Y-%m-%d %H:%M")
|
|
article_name = f"{now_dt:%Y-%m-%d}.html"
|
|
body = markdownish_to_html(conclusions)
|
|
raw = html.escape(raw_summary[:60000])
|
|
article_title, _ = clean_rss_text(f"<article>{body}</article>")
|
|
title_image_path = write_title_image(article_name, article_title, now)
|
|
title_image_href = site_href(f"images/{title_image_path.name}")
|
|
title_image_html = f'<img class="title-image" src="{html.escape(title_image_href)}" alt="{html.escape(article_title)}">'
|
|
article_content = f"""
|
|
<article id="article" class="article post h-entry" itemscope itemtype="https://schema.org/Article">
|
|
{title_image_html}
|
|
<div class="entry-content post-content e-content" itemprop="articleBody">
|
|
{body}
|
|
</div>
|
|
</article>
|
|
<details>
|
|
<summary>Raw data bundle shown to the AI goblin</summary>
|
|
<pre>{raw}</pre>
|
|
</details>
|
|
"""
|
|
article_path = articles_dir / article_name
|
|
article_path.touch(exist_ok=True)
|
|
article_path.write_text(
|
|
blog_shell(
|
|
"Smart Home Gossip Gazette",
|
|
f"Daily home intelligence briefing · Generated {now}",
|
|
article_content,
|
|
article_links(),
|
|
image_href=f"images/{title_image_path.name}",
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
featured = f"""
|
|
<article id="article" class="article post h-entry" itemscope itemtype="https://schema.org/Article">
|
|
<p class="meta">Latest article · {html.escape(now)}</p>
|
|
{title_image_html}
|
|
<div class="entry-content post-content e-content" itemprop="articleBody">
|
|
{body}
|
|
</div>
|
|
<p><a href="{html.escape(site_href(f'articles/{article_name}'))}">Permanent link for this article →</a></p>
|
|
</article>
|
|
"""
|
|
index_path = WEB_DIR / "index.html"
|
|
index_path.write_text(
|
|
blog_shell("Smart Home Gossip Gazette", "A daily blog of your Home Assistant household signals", featured, article_links(), image_href=f"images/{title_image_path.name}"),
|
|
encoding="utf-8",
|
|
)
|
|
write_favicon()
|
|
write_rss_feed()
|
|
return article_path
|
|
|
|
|
|
def write_markdown_report(summary: str, conclusions: str) -> Path:
|
|
REPORT_DIR.mkdir(parents=True, exist_ok=True)
|
|
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
path = REPORT_DIR / f"daily-ai-analysis-{stamp}.md"
|
|
path.write_text(f"# Daily Home Assistant AI Analysis\n\n{conclusions}\n\n## Data bundle\n\n```text\n{summary}\n```\n", encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def cmd_collect() -> int:
|
|
require_config(for_ai=False)
|
|
snapshot = make_snapshot()
|
|
path = save_snapshot(snapshot)
|
|
cleanup_old_snapshots()
|
|
print(f"Collected snapshot: {path}")
|
|
return 0
|
|
|
|
|
|
def cmd_analyze() -> int:
|
|
require_config(for_ai=True)
|
|
snapshots = load_recent_snapshots(ANALYZE_SNAPSHOT_HOURS)
|
|
if not snapshots:
|
|
raise RuntimeError(f"No snapshots found in {DATA_DIR}; run collect first")
|
|
summary = build_daily_summary(snapshots)
|
|
previous_articles = load_recent_article_context(ARTICLE_CONTEXT_DAYS)
|
|
conclusions = get_llm_conclusions(summary, previous_articles)
|
|
md_path = write_markdown_report(summary, conclusions)
|
|
html_path = publish_webpage(conclusions, summary)
|
|
print(f"Wrote report: {md_path}")
|
|
print(f"Published webpage: {html_path}")
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Home Assistant observer")
|
|
parser.add_argument("mode", nargs="?", default="collect", choices=["collect", "analyze"], help="collect snapshots or analyze/publish them")
|
|
args = parser.parse_args()
|
|
try:
|
|
return cmd_collect() if args.mode == "collect" else cmd_analyze()
|
|
except Exception as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|