Raw data bundle shown to the AI goblin
{raw}
#!/usr/bin/env python3 """ Home Assistant observer Modes: collect - run every 30 minutes; stores a compact JSON snapshot locally analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page Configuration is via environment variables. See .env.example. """ from __future__ import annotations import argparse import html import json import os import re import subprocess import sys from datetime import datetime, timedelta, timezone from email.utils import format_datetime from pathlib import Path from typing import Any import requests HA_URL = os.environ.get("HA_URL", "").rstrip("/") HA_TOKEN = os.environ.get("HA_TOKEN", "") DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports")) WEB_DIR = Path(os.environ.get("WEB_DIR", "./web")) SITE_BASE_PATH = os.environ.get("SITE_BASE_PATH", "/").strip() or "/" SITE_URL = os.environ.get("SITE_URL", "http://localhost").rstrip("/") PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md")) HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24")) MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20")) ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24")) ARTICLE_CONTEXT_DAYS = int(os.environ.get("ARTICLE_CONTEXT_DAYS", "7")) KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14")) # LLM_MODE: none | pi | ollama | openai LLM_MODE = os.environ.get("LLM_MODE", "none").lower() OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") PI_BIN = os.environ.get("PI_BIN", "pi") PI_MODEL = os.environ.get("PI_MODEL", "") PI_TIMEOUT = int(os.environ.get("PI_TIMEOUT", "600")) RELEVANT_DOMAINS = set( x.strip() for x in os.environ.get( "RELEVANT_DOMAINS", "sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather", ).split(",") if x.strip() ) EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip()) ALLOWED_ATTRIBUTES = { "friendly_name", "unit_of_measurement", "device_class", "state_class", "current_temperature", "temperature", "humidity", "battery_level", "brightness", "gps_accuracy", "source_type", "assumed_state", } class ConfigError(RuntimeError): pass def require_config(for_ai: bool = False) -> None: if not HA_URL: raise ConfigError("HA_URL is not set") if not HA_TOKEN: raise ConfigError("HA_TOKEN is not set") if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY: raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set") def ha_get(path: str, params: dict[str, str] | None = None) -> Any: headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"} response = requests.get(f"{HA_URL}{path}", headers=headers, params=params, timeout=60) try: response.raise_for_status() except requests.HTTPError as exc: detail = response.text.strip() raise requests.HTTPError(f"{exc}; response={detail[:500]}", response=response) from exc return response.json() def is_relevant_entity(entity_id: str) -> bool: return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]: return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES} def get_states() -> list[dict[str, Any]]: useful: list[dict[str, Any]] = [] for item in ha_get("/api/states"): entity_id = item.get("entity_id", "") state = item.get("state") if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}: continue useful.append( { "entity_id": entity_id, "state": state, "attributes": compact_attributes(item.get("attributes", {})), "last_changed": item.get("last_changed"), "last_updated": item.get("last_updated"), } ) return sorted(useful, key=lambda x: x["entity_id"]) def get_history(hours: int, entity_ids: list[str]) -> list[dict[str, Any]]: start = datetime.now(timezone.utc) - timedelta(hours=hours) changes: list[dict[str, Any]] = [] # Recent Home Assistant versions/configurations require filter_entity_id for # the history endpoint. Query in chunks to avoid an overlong URL. chunk_size = 50 for i in range(0, len(entity_ids), chunk_size): chunk = entity_ids[i : i + chunk_size] data = ha_get( f"/api/history/period/{start.isoformat(timespec='seconds')}", params={"filter_entity_id": ",".join(chunk), "minimal_response": ""}, ) for entity_history in data: if not entity_history: continue entity_id = entity_history[0].get("entity_id", "") if not is_relevant_entity(entity_id): continue compact = [] for item in entity_history[-MAX_HISTORY_PER_ENTITY:]: state = item.get("state") if state in {"unknown", "unavailable", None}: continue compact.append({"state": state, "last_changed": item.get("last_changed")}) if len(set(x["state"] for x in compact)) > 1: changes.append({"entity_id": entity_id, "recent_states": compact}) return sorted(changes, key=lambda x: x["entity_id"]) def make_snapshot() -> dict[str, Any]: states = get_states() entity_ids = [state["entity_id"] for state in states] return { "generated_at": datetime.now().isoformat(timespec="seconds"), "history_hours": HISTORY_HOURS, "states": states, "history": get_history(HISTORY_HOURS, entity_ids), } def save_snapshot(snapshot: dict[str, Any]) -> Path: DATA_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") path = DATA_DIR / f"snapshot-{stamp}.json" path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8") return path def cleanup_old_snapshots() -> None: cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS) for path in DATA_DIR.glob("snapshot-*.json"): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: path.unlink(missing_ok=True) def load_recent_snapshots(hours: int) -> list[dict[str, Any]]: cutoff = datetime.now() - timedelta(hours=hours) snapshots = [] for path in sorted(DATA_DIR.glob("snapshot-*.json")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: snapshots.append(json.loads(path.read_text(encoding="utf-8"))) except Exception as exc: print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr) return snapshots def summarize_snapshot(snapshot: dict[str, Any]) -> str: lines = [f"Snapshot: {snapshot.get('generated_at')}", "Current states:"] for state in snapshot.get("states", []): attrs = state.get("attributes", {}) name = attrs.get("friendly_name", state.get("entity_id")) unit = attrs.get("unit_of_measurement", "") value = f"{state.get('state')} {unit}".strip() lines.append(f"- {name} ({state.get('entity_id')}): {value}; last_changed={state.get('last_changed')}") lines.append("Recently changed entities:") for item in snapshot.get("history", []): transitions = ", ".join(f"{x.get('state')} @ {x.get('last_changed')}" for x in item.get("recent_states", [])[-8:]) lines.append(f"- {item.get('entity_id')}: {transitions}") return "\n".join(lines) def build_daily_summary(snapshots: list[dict[str, Any]]) -> str: parts = [ f"Daily Home Assistant bundle generated {datetime.now().isoformat(timespec='seconds')}", f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.", ] for snapshot in snapshots: parts.append("\n---\n" + summarize_snapshot(snapshot)) return "\n".join(parts) def read_extra_llm_instructions() -> str: if not PROMPT_FILE.exists(): return "" return PROMPT_FILE.read_text(encoding="utf-8").strip() def load_recent_article_context(days: int) -> str: if days <= 0 or not REPORT_DIR.exists(): return "" cutoff = datetime.now() - timedelta(days=days) articles: list[str] = [] for path in sorted(REPORT_DIR.glob("daily-ai-analysis-*.md")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: text = path.read_text(encoding="utf-8") except Exception as exc: print(f"Skipping unreadable previous report {path}: {exc}", file=sys.stderr) continue conclusions = text.split("\n## Data bundle\n", 1)[0].strip() articles.append(f"PREVIOUS ARTICLE {path.name}:\n{conclusions[:8000]}") return "\n\n---\n\n".join(articles[-7:]) def analysis_prompt(input_summary: str, previous_articles: str = "") -> str: extra_instructions = read_extra_llm_instructions() extra_block = "" if extra_instructions: extra_block = f""" ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}: {extra_instructions} """ previous_block = "" if previous_articles: previous_block = f""" PREVIOUS ARTICLES FROM THE LAST {ARTICLE_CONTEXT_DAYS} DAYS FOR CONTEXT: Use these only for trend/context awareness. Do not claim something happened today unless today's data supports it. {previous_articles} """ return f"""You are writing today's Home Assistant smart-home blog article for the owner. Write a funny but useful morning briefing in a blog/article style. Use light humor, emojis, and playful headings, but remain factual and privacy-aware. Include: - A short comedy headline for the day - What seemed to happen at home today - Behavioral patterns that can reasonably be inferred - Notable trends compared with recent previous articles, if supported - What a nosy raccoon/hacker could figure out about the resident - Anomalies, risks, or privacy/security concerns - Suggested Home Assistant automations or fixes Distinguish strong evidence from guesses. Do not invent facts not supported by the data. {extra_block}{previous_block} TODAY'S DATA: {input_summary} """ def call_ollama(prompt: str) -> str: response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300) response.raise_for_status() return response.json().get("response", "").strip() def call_openai(prompt: str) -> str: response = requests.post( "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, json={ "model": OPENAI_MODEL, "messages": [ {"role": "system", "content": "You are a careful but funny smart-home analyst."}, {"role": "user", "content": prompt}, ], "temperature": 0.35, }, timeout=300, ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"].strip() def call_pi(prompt: str) -> str: cmd = [PI_BIN, "--no-tools"] if PI_MODEL: cmd.extend(["--model", PI_MODEL]) cmd.extend(["-p", "Analyze the Home Assistant data from stdin and write the requested briefing."]) result = subprocess.run( cmd, input=prompt, text=True, capture_output=True, timeout=PI_TIMEOUT, check=False, ) if result.returncode != 0: stderr = result.stderr.strip() raise RuntimeError(f"pi exited with status {result.returncode}: {stderr[-1000:]}") return result.stdout.strip() def get_llm_conclusions(input_summary: str, previous_articles: str = "") -> str: if LLM_MODE == "none": return "AI analysis disabled. Set LLM_MODE=pi, LLM_MODE=ollama, or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤" prompt = analysis_prompt(input_summary, previous_articles) if LLM_MODE == "ollama": return call_ollama(prompt) if LLM_MODE == "openai": return call_openai(prompt) if LLM_MODE == "pi": return call_pi(prompt) return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, pi, ollama, or openai." def markdownish_to_html(text: str) -> str: safe = html.escape(text) safe = re.sub(r"^### (.*)$", r"
{raw}