#!/usr/bin/env python3 """ Home Assistant observer Modes: collect - run every 30 minutes; stores a compact JSON snapshot locally analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page Configuration is via environment variables. See .env.example. """ from __future__ import annotations import argparse import html import json import os import re import subprocess import sys import tempfile from datetime import datetime, timedelta, timezone from email.utils import format_datetime from pathlib import Path from typing import Any from zoneinfo import ZoneInfo import requests HA_URL = os.environ.get("HA_URL", "").rstrip("/") HA_TOKEN = os.environ.get("HA_TOKEN", "") DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports")) WEB_DIR = Path(os.environ.get("WEB_DIR", "./web")) SITE_BASE_PATH = os.environ.get("SITE_BASE_PATH", "/").strip() or "/" SITE_URL = os.environ.get("SITE_URL", "http://localhost").rstrip("/") PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md")) HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24")) MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20")) ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24")) ARTICLE_CONTEXT_DAYS = int(os.environ.get("ARTICLE_CONTEXT_DAYS", "7")) MAX_ANALYZE_CHARS = int(os.environ.get("MAX_ANALYZE_CHARS", "80000")) DISPLAY_TIMEZONE = os.environ.get("DISPLAY_TIMEZONE", "Europe/Copenhagen") KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14")) # LLM_MODE: none | pi | ollama | openai LLM_MODE = os.environ.get("LLM_MODE", "none").lower() OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") PI_BIN = os.environ.get("PI_BIN", "pi") PI_MODEL = os.environ.get("PI_MODEL", "") PI_TIMEOUT = int(os.environ.get("PI_TIMEOUT", "600")) RELEVANT_DOMAINS = set( x.strip() for x in os.environ.get( "RELEVANT_DOMAINS", "sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather", ).split(",") if x.strip() ) EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip()) ALLOWED_ATTRIBUTES = { "friendly_name", "unit_of_measurement", "device_class", "state_class", "current_temperature", "temperature", "humidity", "battery_level", "brightness", "gps_accuracy", "source_type", "assumed_state", } IMPORTANT_ENTITY_KEYWORDS = { "alarm": 100, "smoke": 100, "co_": 100, "carbon_monoxide": 100, "leak": 95, "water": 80, "door": 85, "window": 80, "lock": 85, "motion": 70, "presence": 70, "occupancy": 70, "person": 75, "device_tracker": 75, "phone": 70, "laptop": 60, "battery": 65, "humidity": 60, "temperature": 55, "climate": 55, "heating": 55, "dehumidifier": 70, "backup": 70, "internet": 65, "speedtest": 65, "router": 60, "light": 45, "switch": 35, "sonos": 45, "media": 40, "tv": 40, "megane": 50, "fjr": 50, "plant": 45, "smb_": 60, } class ConfigError(RuntimeError): pass def require_config(for_ai: bool = False) -> None: if not HA_URL: raise ConfigError("HA_URL is not set") if not HA_TOKEN: raise ConfigError("HA_TOKEN is not set") if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY: raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set") def ha_get(path: str, params: dict[str, str] | None = None) -> Any: headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"} response = requests.get(f"{HA_URL}{path}", headers=headers, params=params, timeout=60) try: response.raise_for_status() except requests.HTTPError as exc: detail = response.text.strip() raise requests.HTTPError(f"{exc}; response={detail[:500]}", response=response) from exc return response.json() def is_relevant_entity(entity_id: str) -> bool: return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]: return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES} def get_states() -> list[dict[str, Any]]: useful: list[dict[str, Any]] = [] for item in ha_get("/api/states"): entity_id = item.get("entity_id", "") state = item.get("state") if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}: continue useful.append( { "entity_id": entity_id, "state": state, "attributes": compact_attributes(item.get("attributes", {})), "last_changed": item.get("last_changed"), "last_updated": item.get("last_updated"), } ) return sorted(useful, key=lambda x: x["entity_id"]) def get_history(hours: int, entity_ids: list[str]) -> list[dict[str, Any]]: start = datetime.now(timezone.utc) - timedelta(hours=hours) changes: list[dict[str, Any]] = [] # Recent Home Assistant versions/configurations require filter_entity_id for # the history endpoint. Query in chunks to avoid an overlong URL. chunk_size = 50 for i in range(0, len(entity_ids), chunk_size): chunk = entity_ids[i : i + chunk_size] data = ha_get( f"/api/history/period/{start.isoformat(timespec='seconds')}", params={"filter_entity_id": ",".join(chunk), "minimal_response": ""}, ) for entity_history in data: if not entity_history: continue entity_id = entity_history[0].get("entity_id", "") if not is_relevant_entity(entity_id): continue compact = [] for item in entity_history[-MAX_HISTORY_PER_ENTITY:]: state = item.get("state") if state in {"unknown", "unavailable", None}: continue compact.append({"state": state, "last_changed": item.get("last_changed")}) if len(set(x["state"] for x in compact)) > 1: changes.append({"entity_id": entity_id, "recent_states": compact}) return sorted(changes, key=lambda x: x["entity_id"]) def make_snapshot() -> dict[str, Any]: states = get_states() entity_ids = [state["entity_id"] for state in states] return { "generated_at": datetime.now().isoformat(timespec="seconds"), "history_hours": HISTORY_HOURS, "states": states, "history": get_history(HISTORY_HOURS, entity_ids), } def save_snapshot(snapshot: dict[str, Any]) -> Path: DATA_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") path = DATA_DIR / f"snapshot-{stamp}.json" path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8") return path def cleanup_old_snapshots() -> None: cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS) for path in DATA_DIR.glob("snapshot-*.json"): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: path.unlink(missing_ok=True) def load_recent_snapshots(hours: int) -> list[dict[str, Any]]: cutoff = datetime.now() - timedelta(hours=hours) snapshots = [] for path in sorted(DATA_DIR.glob("snapshot-*.json")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: snapshots.append(json.loads(path.read_text(encoding="utf-8"))) except Exception as exc: print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr) return snapshots def display_time(value: str | None) -> str: if not value: return "" try: dt = datetime.fromisoformat(value.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) local = dt.astimezone(ZoneInfo(DISPLAY_TIMEZONE)) return local.strftime("%Y-%m-%d %H:%M:%S %Z") except Exception: return value def entity_importance(entity_id: str, attrs: dict[str, Any] | None = None) -> int: attrs = attrs or {} domain = entity_id.split(".", 1)[0] text = f"{entity_id} {attrs.get('friendly_name', '')} {attrs.get('device_class', '')}".lower() score = 0 domain_scores = { "alarm_control_panel": 100, "lock": 90, "person": 80, "device_tracker": 75, "binary_sensor": 60, "climate": 55, "cover": 50, "sensor": 45, "light": 35, "switch": 30, "media_player": 25, } score += domain_scores.get(domain, 10) for keyword, points in IMPORTANT_ENTITY_KEYWORDS.items(): if keyword in text: score += points # Sønderborg/Denmark home is the primary residence and absolute priority. # Samobor/Croatia entities use the smb_ prefix and are still included, but # they should lose ties when the LLM input has to be size-limited. if "smb_" in entity_id.lower(): score -= 40 else: score += 120 state = str(attrs.get("state", "")).lower() if state in {"on", "open", "unlocked", "detected", "home"}: score += 15 return score def summarize_snapshot(snapshot: dict[str, Any]) -> str: lines = [ f"Snapshot: {display_time(snapshot.get('generated_at'))}", "Priority current states first; lower-priority entities follow only if the LLM size limit allows.", "Current states:", ] states = sorted( snapshot.get("states", []), key=lambda state: (-entity_importance(state.get("entity_id", ""), state.get("attributes", {})), state.get("entity_id", "")), ) for state in states: attrs = state.get("attributes", {}) name = attrs.get("friendly_name", state.get("entity_id")) unit = attrs.get("unit_of_measurement", "") value = f"{state.get('state')} {unit}".strip() score = entity_importance(state.get("entity_id", ""), attrs) lines.append(f"- importance={score} {name} ({state.get('entity_id')}): {value}; last_changed={display_time(state.get('last_changed'))}") lines.append("Recently changed entities:") history = sorted( snapshot.get("history", []), key=lambda item: (-entity_importance(item.get("entity_id", "")), item.get("entity_id", "")), ) for item in history: transitions = ", ".join(f"{x.get('state')} @ {display_time(x.get('last_changed'))}" for x in item.get("recent_states", [])[-8:]) score = entity_importance(item.get("entity_id", "")) lines.append(f"- importance={score} {item.get('entity_id')}: {transitions}") return "\n".join(lines) def build_daily_summary(snapshots: list[dict[str, Any]]) -> str: parts = [ f"Daily Home Assistant bundle generated {datetime.now(ZoneInfo(DISPLAY_TIMEZONE)).isoformat(timespec='seconds')}", f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.", f"Input capped at roughly {MAX_ANALYZE_CHARS} characters for the LLM.", f"All times in this bundle are converted to {DISPLAY_TIMEZONE} local time.", ] total = len("\n".join(parts)) included = 0 for snapshot in reversed(snapshots): block = "\n---\n" + summarize_snapshot(snapshot) if total + len(block) > MAX_ANALYZE_CHARS and included > 0: break if len(block) > MAX_ANALYZE_CHARS: block = block[:MAX_ANALYZE_CHARS] + "\n[Snapshot truncated for LLM size limit]" parts.append(block) total += len(block) included += 1 parts.insert(2, f"Included {included} most recent snapshots after size limiting.") return "\n".join(parts) def read_extra_llm_instructions() -> str: if not PROMPT_FILE.exists(): return "" return PROMPT_FILE.read_text(encoding="utf-8").strip() def load_recent_article_context(days: int) -> str: if days <= 0 or not REPORT_DIR.exists(): return "" cutoff = datetime.now() - timedelta(days=days) articles: list[str] = [] for path in sorted(REPORT_DIR.glob("daily-ai-analysis-*.md")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: text = path.read_text(encoding="utf-8") except Exception as exc: print(f"Skipping unreadable previous report {path}: {exc}", file=sys.stderr) continue conclusions = text.split("\n## Data bundle\n", 1)[0].strip() articles.append(f"PREVIOUS ARTICLE {path.name}:\n{conclusions[:8000]}") return "\n\n---\n\n".join(articles[-7:]) def analysis_prompt(input_summary: str, previous_articles: str = "") -> str: extra_instructions = read_extra_llm_instructions() extra_block = "" if extra_instructions: extra_block = f""" ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}: {extra_instructions} """ previous_block = "" if previous_articles: previous_block = f""" PREVIOUS ARTICLES FROM THE LAST {ARTICLE_CONTEXT_DAYS} DAYS FOR CONTEXT: Use these only for trend/context awareness. Do not claim something happened today unless today's data supports it. {previous_articles} """ return f"""You are writing today's Home Assistant smart-home blog article for the owner. Write a funny but useful morning briefing in a clean blog/article style. Use light humor, but keep emojis/smileys rare: at most one in the whole article. Prefer clear headings, short paragraphs, and readable bullet lists. Remain factual and privacy-aware. Include: - A short comedy headline for the day - What seemed to happen at home today - Behavioral patterns that can reasonably be inferred - Notable trends compared with recent previous articles, if supported - What a nosy raccoon/hacker could figure out about the resident - Anomalies, risks, or privacy/security concerns - Suggested Home Assistant automations or fixes Distinguish strong evidence from guesses. Do not invent facts not supported by the data. {extra_block}{previous_block} TODAY'S DATA: {input_summary} """ def call_ollama(prompt: str) -> str: response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300) response.raise_for_status() return response.json().get("response", "").strip() def call_openai(prompt: str) -> str: response = requests.post( "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, json={ "model": OPENAI_MODEL, "messages": [ {"role": "system", "content": "You are a careful but funny smart-home analyst."}, {"role": "user", "content": prompt}, ], "temperature": 0.35, }, timeout=300, ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"].strip() def call_pi(prompt: str) -> str: # Avoid piping the prompt on stdin here. In pi print mode, piped stdin can be # treated as the primary output/input stream in surprising ways. Passing the # prompt as an @file gives reliable non-interactive cron behavior. with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=".md", delete=False) as tmp: tmp.write(prompt) prompt_path = tmp.name try: cmd = [PI_BIN, "--no-tools"] if PI_MODEL: cmd.extend(["--model", PI_MODEL]) cmd.extend(["-p", f"@{prompt_path}"]) result = subprocess.run( cmd, text=True, capture_output=True, timeout=PI_TIMEOUT, check=False, ) finally: Path(prompt_path).unlink(missing_ok=True) if result.returncode != 0: stderr = result.stderr.strip() raise RuntimeError(f"pi exited with status {result.returncode}: {stderr[-1000:]}") output = result.stdout.strip() if not output: raise RuntimeError("pi returned an empty analysis") return output def get_llm_conclusions(input_summary: str, previous_articles: str = "") -> str: if LLM_MODE == "none": return "AI analysis disabled. Set LLM_MODE=pi, LLM_MODE=ollama, or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤" prompt = analysis_prompt(input_summary, previous_articles) if LLM_MODE == "ollama": return call_ollama(prompt) if LLM_MODE == "openai": return call_openai(prompt) if LLM_MODE == "pi": return call_pi(prompt) return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, pi, ollama, or openai." def remove_most_emoji(text: str) -> str: # Keep the writing readable on the blog page even if the model gets a bit too festive. return re.sub(r"[\U0001F300-\U0001FAFF\U00002700-\U000027BF\U00002600-\U000026FF]+", "", text) def inline_markdown(text: str) -> str: safe = html.escape(remove_most_emoji(text).strip()) safe = re.sub(r"\*\*(.*?)\*\*", r"\1", safe) safe = re.sub(r"`([^`]+)`", r"\1", safe) return safe def move_bottom_line_before_serious(blocks: list[str]) -> list[str]: serious_start = None bottom_start = None bottom_end = None for i, block in enumerate(blocks): heading = re.match(r"(.*?)$", block, flags=re.DOTALL) if not heading: continue title = re.sub(r"<[^>]+>", "", html.unescape(heading.group(2))).lower() if serious_start is None and ("part ii" in title or "serious briefing" in title): serious_start = i elif serious_start is not None and ("bottom line" in title or "conclusion" in title): bottom_start = i break if serious_start is None or bottom_start is None: return blocks bottom_end = len(blocks) for i in range(bottom_start + 1, len(blocks)): if re.match(r".*?$", blocks[i], flags=re.DOTALL): bottom_end = i break bottom_section = blocks[bottom_start:bottom_end] remaining = blocks[:bottom_start] + blocks[bottom_end:] return remaining[:serious_start] + bottom_section + remaining[serious_start:] def collapse_serious_sections(blocks: list[str]) -> list[str]: output: list[str] = [] in_serious = False after_bottom_line = False current_summary = "" current_content: list[str] = [] def close_detail() -> None: nonlocal current_summary, current_content if current_summary: content = "\n".join(current_content).strip() output.append(f"
{current_summary}\n{content}\n
") current_summary = "" current_content = [] for block in blocks: heading = re.match(r"(.*?)$", block, flags=re.DOTALL) if heading: title = heading.group(2) plain_title = re.sub(r"<[^>]+>", "", html.unescape(title)).lower() is_bottom_line = "bottom line" in plain_title or "conclusion" in plain_title if is_bottom_line: close_detail() in_serious = False after_bottom_line = True output.append(block) continue if not in_serious and ("part ii" in plain_title or "serious briefing" in plain_title): in_serious = True output.append(block) continue if in_serious or after_bottom_line: in_serious = True close_detail() current_summary = title continue if in_serious: if current_summary: current_content.append(block) else: output.append(block) else: output.append(block) close_detail() return output def markdownish_to_html(text: str) -> str: blocks: list[str] = [] paragraph: list[str] = [] list_items: list[str] = [] def flush_paragraph() -> None: nonlocal paragraph if paragraph: blocks.append(f"

{inline_markdown(' '.join(paragraph))}

") paragraph = [] def flush_list() -> None: nonlocal list_items if list_items: blocks.append("") list_items = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line: flush_paragraph() flush_list() continue heading = re.match(r"^(#{1,3})\s+(.+)$", line) if heading: flush_paragraph() flush_list() level = min(len(heading.group(1)), 3) blocks.append(f"{inline_markdown(heading.group(2))}") continue bullet = re.match(r"^[-*]\s+(.+)$", line) if bullet: flush_paragraph() list_items.append(inline_markdown(bullet.group(1))) continue flush_list() paragraph.append(line) flush_paragraph() flush_list() blocks = move_bottom_line_before_serious(blocks) return "\n".join(collapse_serious_sections(blocks)) BLOG_CSS = """ :root { color-scheme: dark; --cyan:#00f5ff; --blue:#2777ff; --violet:#8b5cf6; --amber:#fbbf24; --panel:#07111fcc; --line:#1de7ff66; } * { box-sizing:border-box; } body { margin:0; min-height:100vh; color:#dff9ff; line-height:1.7; font-family:Inter,ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif; background: radial-gradient(circle at 16% 10%, #1746ff55 0 12rem, transparent 28rem), radial-gradient(circle at 82% 4%, #00f5ff30 0 10rem, transparent 24rem), radial-gradient(circle at 50% 100%, #6d28d955 0 15rem, transparent 34rem), linear-gradient(135deg,#02040a 0%,#07111f 48%,#030712 100%); overflow-x:hidden; } body::before { content:""; position:fixed; inset:0; pointer-events:none; opacity:.34; background-image: linear-gradient(#00f5ff16 1px, transparent 1px), linear-gradient(90deg,#00f5ff16 1px, transparent 1px), linear-gradient(115deg, transparent 0 48%, #7dd3fc22 50%, transparent 52% 100%); background-size:54px 54px,54px 54px,180px 180px; mask-image:linear-gradient(to bottom,#000 0%,#000 55%,transparent 100%); } header { position:relative; border-bottom:1px solid var(--line); background:linear-gradient(90deg,#020617dd,#051b33bb,#020617dd); box-shadow:0 0 42px #00d9ff22; } header::before, header::after { content:""; position:absolute; top:0; bottom:0; width:18vw; border-color:var(--cyan); opacity:.65; pointer-events:none; } header::before { left:0; border-top:2px solid; border-left:2px solid; clip-path:polygon(0 0,100% 0,35% 100%,0 100%); } header::after { right:0; border-top:2px solid; border-right:2px solid; clip-path:polygon(0 0,100% 0,100% 100%,65% 100%); } .wrap { max-width:1180px; margin:0 auto; padding:1.5rem; position:relative; } .masthead { padding:3rem 1.5rem 2.6rem; text-align:center; } .kicker { color:var(--cyan); text-transform:uppercase; letter-spacing:.28em; font:800 .78rem ui-monospace,SFMono-Regular,Menlo,monospace; text-shadow:0 0 14px #00f5ff; } h1 { margin:.35rem 0; font-size:clamp(2.4rem,7vw,6rem); line-height:.9; text-transform:uppercase; letter-spacing:.05em; color:#f8feff; text-shadow:0 0 12px #00f5ff,0 0 38px #2777ff; } h2,h3 { color:#c8fbff; line-height:1.2; letter-spacing:.03em; text-shadow:0 0 12px #00f5ff88; } article, aside { position:relative; background:linear-gradient(180deg,#071827d9,#050914e6); border:1px solid var(--line); clip-path:polygon(0 18px,18px 0,100% 0,100% calc(100% - 18px),calc(100% - 18px) 100%,0 100%); box-shadow:0 0 0 1px #2777ff22 inset,0 0 34px #00d9ff18,0 24px 60px #000b; } article::before, aside::before { content:""; position:absolute; inset:0; pointer-events:none; border:1px solid #ffffff12; clip-path:inherit; } article { padding:clamp(1.2rem,3vw,2.4rem); } article p { margin:0 0 1.05rem; max-width:72ch; } article ul { margin:.2rem 0 1.2rem; padding-left:1.35rem; max-width:74ch; } article li { margin:.35rem 0; } article p, article li { font-size:1.04rem; color:#e6fbff; } article h1 { font-size:clamp(1.8rem,4vw,3.5rem); text-align:left; } .title-image { display:block; width:100%; height:auto; margin:0 0 1.4rem; border:1px solid #22d3ee66; box-shadow:0 0 28px #00d9ff22; } article h2 { margin-top:1.8rem; padding-top:1rem; border-top:1px solid #22d3ee33; } article h1 + p, article h2 + p, article h3 + p { margin-top:.3rem; } strong { color:#ffffff; font-weight:750; } code { color:#fef3c7; background:#020617; border:1px solid #22d3ee33; padding:.08rem .28rem; } .layout { display:grid; grid-template-columns:minmax(0,1fr) 310px; gap:1.35rem; align-items:start; } aside { padding:1.1rem; position:sticky; top:1rem; } .archive { list-style:none; margin:0; padding:0; } .archive li { border-bottom:1px solid #22d3ee33; padding:.7rem 0; font-family:ui-monospace,SFMono-Regular,Menlo,monospace; } .archive li::before { content:"▸ "; color:var(--cyan); text-shadow:0 0 10px var(--cyan); } .archive li:last-child { border-bottom:0; } a { color:#67e8f9; text-decoration:none; text-shadow:0 0 9px #00f5ff77; } a:hover { color:white; text-decoration:none; filter:drop-shadow(0 0 8px var(--cyan)); } .meta { color:#9eeaff; font:.95rem ui-monospace,SFMono-Regular,Menlo,monospace; letter-spacing:.04em; } details { margin-top:1.5rem; border-top:1px solid #22d3ee33; padding-top:1rem; } details.briefing-section { background:#02061788; border:1px solid #22d3ee33; padding:.75rem 1rem; margin:.8rem 0; } details.briefing-section summary { font-size:1.05rem; } summary { cursor:pointer; color:var(--amber); text-transform:uppercase; letter-spacing:.08em; } pre { white-space:pre-wrap; background:#01040acc; color:#bff8ff; padding:1rem; border:1px solid #22d3ee44; border-radius:0; overflow:auto; font-size:.82rem; box-shadow:0 0 22px #00d9ff11 inset; } footer { color:#7dd3fc; text-align:center; padding:2rem; font:.82rem ui-monospace,SFMono-Regular,Menlo,monospace; text-transform:uppercase; letter-spacing:.12em; } @media (max-width:850px) { .layout { grid-template-columns:1fr; } aside { position:static; } .masthead { text-align:left; } } """ def site_href(relative_path: str = "") -> str: base = SITE_BASE_PATH if not base.startswith("/"): base = f"/{base}" if not base.endswith("/"): base = f"{base}/" return f"{base}{relative_path.lstrip('/')}" def site_url(relative_path: str = "") -> str: return f"{SITE_URL}{site_href(relative_path)}" def article_links() -> str: articles_dir = WEB_DIR / "articles" if not articles_dir.exists(): return "
  • No articles yet. The raccoon newsroom is warming up.
  • " links = [] for path in sorted(articles_dir.glob("*.html"), reverse=True): label = path.stem try: label = datetime.strptime(path.stem, "%Y-%m-%d").strftime("%A, %B %-d, %Y") except ValueError: pass href = site_href(f"articles/{path.name}") links.append(f'
  • {html.escape(label)}
  • ') return "\n".join(links) or "
  • No articles yet. The raccoon newsroom is warming up.
  • " def svg_text_lines(text: str, max_chars: int = 28, max_lines: int = 3) -> list[str]: words = text.split() lines: list[str] = [] current = "" for word in words: candidate = f"{current} {word}".strip() if len(candidate) <= max_chars: current = candidate continue if current: lines.append(current) current = word if len(lines) == max_lines - 1: break if current and len(lines) < max_lines: lines.append(current) if len(lines) == max_lines and len(" ".join(words)) > len(" ".join(lines)): lines[-1] = lines[-1].rstrip(".,;: ") + "…" return lines or ["Smart Home Briefing"] def write_title_image(article_name: str, title: str, generated_at: str) -> Path: images_dir = WEB_DIR / "images" images_dir.mkdir(parents=True, exist_ok=True) image_name = article_name.replace(".html", ".svg") lines = svg_text_lines(remove_most_emoji(title)) text_spans = "\n".join( f'{html.escape(line)}' for i, line in enumerate(lines) ) svg = f""" HOME TELEMETRY DISPATCH {text_spans} Smart Home Gossip Gazette · {html.escape(generated_at)} """ path = images_dir / image_name path.write_text(svg, encoding="utf-8") return path def write_favicon() -> Path: favicon = f""" """ path = WEB_DIR / "favicon.svg" path.write_text(favicon, encoding="utf-8") return path def clean_rss_text(article_html: str) -> tuple[str, str]: article_match = re.search(r"]*>(.*?)", article_html, flags=re.DOTALL | re.IGNORECASE) content = article_match.group(1) if article_match else article_html content = re.sub(r"", " ", content, flags=re.DOTALL | re.IGNORECASE) content = re.sub(r"

    ]*>Permanent link.*?

    ", " ", content, flags=re.DOTALL | re.IGNORECASE) title_match = re.search(r"]*>(.*?)|]*>(.*?)", content, flags=re.DOTALL | re.IGNORECASE) title = "Smart Home Briefing" if title_match: title = re.sub(r"<[^>]+>", " ", title_match.group(1) or title_match.group(2) or "") title = re.sub(r"\s+", " ", html.unescape(title)).strip() or "Smart Home Briefing" text = re.sub(r"
    \s*", "\n", content) text = re.sub(r"", "\n", text, flags=re.IGNORECASE) text = re.sub(r"<[^>]+>", " ", text) text = html.unescape(text) text = re.sub(r"[`*_#]", "", text) text = re.sub(r"^[\s\-•]+", "", text, flags=re.MULTILINE) text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n\s*\n+", "\n\n", text).strip() return title, text def write_rss_feed() -> Path: articles_dir = WEB_DIR / "articles" items = [] for path in sorted(articles_dir.glob("*.html"), reverse=True)[:20]: fallback_title = path.stem try: fallback_title = datetime.strptime(path.stem, "%Y-%m-%d").strftime("Smart Home Briefing - %A, %B %-d, %Y") except ValueError: fallback_title = f"Smart Home Briefing - {path.stem}" content = path.read_text(encoding="utf-8", errors="ignore") article_title, article_text = clean_rss_text(content) title = article_title if article_title != "Smart Home Briefing" else fallback_title description = article_text[:600] pub_dt = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc) url = site_url(f"articles/{path.name}") image_path = WEB_DIR / "images" / path.name.replace(".html", ".svg") enclosure = "" if image_path.exists(): enclosure = f'\n ' items.append(f""" {html.escape(title)} {html.escape(url)} {html.escape(url)} {format_datetime(pub_dt, usegmt=True)} {html.escape(description)}{enclosure} """) now = format_datetime(datetime.now(timezone.utc), usegmt=True) feed_url = site_url("rss.xml") feed = f""" Smart Home Gossip Gazette {html.escape(site_url())} Daily Home Assistant smart-home briefings. en {now} {''.join(items)} """ path = WEB_DIR / "rss.xml" path.write_text(feed, encoding="utf-8") return path def blog_shell(title: str, subtitle: str, main_content: str, archive_links: str, image_href: str = "") -> str: image_meta = "" if image_href: image_meta = f'\n' return f""" {html.escape(title)} {image_meta}
    ◇ orbital home telemetry // raccoon intelligence unit ◇

    {html.escape(title)}

    {html.escape(subtitle)}

    {main_content}
    Generated by Home Assistant Observer · Local nginx uplink active
    """ def publish_webpage(conclusions: str, raw_summary: str) -> Path: WEB_DIR.mkdir(parents=True, exist_ok=True) articles_dir = WEB_DIR / "articles" articles_dir.mkdir(parents=True, exist_ok=True) now_dt = datetime.now() now = now_dt.strftime("%Y-%m-%d %H:%M") article_name = f"{now_dt:%Y-%m-%d}.html" body = markdownish_to_html(conclusions) raw = html.escape(raw_summary[:60000]) article_title, _ = clean_rss_text(f"
    {body}
    ") title_image_path = write_title_image(article_name, article_title, now) title_image_href = site_href(f"images/{title_image_path.name}") title_image_html = f'{html.escape(article_title)}' article_content = f"""
    {title_image_html}
    {body}
    Raw data bundle shown to the AI goblin
    {raw}
    """ article_path = articles_dir / article_name article_path.touch(exist_ok=True) article_path.write_text( blog_shell( "Smart Home Gossip Gazette", f"Daily home intelligence briefing · Generated {now}", article_content, article_links(), image_href=f"images/{title_image_path.name}", ), encoding="utf-8", ) featured = f""" """ index_path = WEB_DIR / "index.html" index_path.write_text( blog_shell("Smart Home Gossip Gazette", "A daily blog of your Home Assistant household signals", featured, article_links(), image_href=f"images/{title_image_path.name}"), encoding="utf-8", ) write_favicon() write_rss_feed() return article_path def write_markdown_report(summary: str, conclusions: str) -> Path: REPORT_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") path = REPORT_DIR / f"daily-ai-analysis-{stamp}.md" path.write_text(f"# Daily Home Assistant AI Analysis\n\n{conclusions}\n\n## Data bundle\n\n```text\n{summary}\n```\n", encoding="utf-8") return path def cmd_collect() -> int: require_config(for_ai=False) snapshot = make_snapshot() path = save_snapshot(snapshot) cleanup_old_snapshots() print(f"Collected snapshot: {path}") return 0 def cmd_analyze() -> int: require_config(for_ai=True) snapshots = load_recent_snapshots(ANALYZE_SNAPSHOT_HOURS) if not snapshots: raise RuntimeError(f"No snapshots found in {DATA_DIR}; run collect first") summary = build_daily_summary(snapshots) previous_articles = load_recent_article_context(ARTICLE_CONTEXT_DAYS) conclusions = get_llm_conclusions(summary, previous_articles) md_path = write_markdown_report(summary, conclusions) html_path = publish_webpage(conclusions, summary) print(f"Wrote report: {md_path}") print(f"Published webpage: {html_path}") return 0 def main() -> int: parser = argparse.ArgumentParser(description="Home Assistant observer") parser.add_argument("mode", nargs="?", default="collect", choices=["collect", "analyze"], help="collect snapshots or analyze/publish them") args = parser.parse_args() try: return cmd_collect() if args.mode == "collect" else cmd_analyze() except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())