Raw data bundle shown to the AI goblin
{raw}
#!/usr/bin/env python3 """ Home Assistant observer Modes: collect - run every 30 minutes; stores a compact JSON snapshot locally analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page Configuration is via environment variables. See .env.example. """ from __future__ import annotations import argparse import html import json import os import re import subprocess import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any import requests HA_URL = os.environ.get("HA_URL", "").rstrip("/") HA_TOKEN = os.environ.get("HA_TOKEN", "") DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports")) WEB_DIR = Path(os.environ.get("WEB_DIR", "./web")) PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md")) HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24")) MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20")) ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24")) ARTICLE_CONTEXT_DAYS = int(os.environ.get("ARTICLE_CONTEXT_DAYS", "7")) KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14")) # LLM_MODE: none | pi | ollama | openai LLM_MODE = os.environ.get("LLM_MODE", "none").lower() OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") PI_BIN = os.environ.get("PI_BIN", "pi") PI_MODEL = os.environ.get("PI_MODEL", "") PI_TIMEOUT = int(os.environ.get("PI_TIMEOUT", "600")) RELEVANT_DOMAINS = set( x.strip() for x in os.environ.get( "RELEVANT_DOMAINS", "sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather", ).split(",") if x.strip() ) EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip()) ALLOWED_ATTRIBUTES = { "friendly_name", "unit_of_measurement", "device_class", "state_class", "current_temperature", "temperature", "humidity", "battery_level", "brightness", "gps_accuracy", "source_type", "assumed_state", } class ConfigError(RuntimeError): pass def require_config(for_ai: bool = False) -> None: if not HA_URL: raise ConfigError("HA_URL is not set") if not HA_TOKEN: raise ConfigError("HA_TOKEN is not set") if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY: raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set") def ha_get(path: str, params: dict[str, str] | None = None) -> Any: headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"} response = requests.get(f"{HA_URL}{path}", headers=headers, params=params, timeout=60) try: response.raise_for_status() except requests.HTTPError as exc: detail = response.text.strip() raise requests.HTTPError(f"{exc}; response={detail[:500]}", response=response) from exc return response.json() def is_relevant_entity(entity_id: str) -> bool: return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]: return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES} def get_states() -> list[dict[str, Any]]: useful: list[dict[str, Any]] = [] for item in ha_get("/api/states"): entity_id = item.get("entity_id", "") state = item.get("state") if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}: continue useful.append( { "entity_id": entity_id, "state": state, "attributes": compact_attributes(item.get("attributes", {})), "last_changed": item.get("last_changed"), "last_updated": item.get("last_updated"), } ) return sorted(useful, key=lambda x: x["entity_id"]) def get_history(hours: int, entity_ids: list[str]) -> list[dict[str, Any]]: start = datetime.now(timezone.utc) - timedelta(hours=hours) changes: list[dict[str, Any]] = [] # Recent Home Assistant versions/configurations require filter_entity_id for # the history endpoint. Query in chunks to avoid an overlong URL. chunk_size = 50 for i in range(0, len(entity_ids), chunk_size): chunk = entity_ids[i : i + chunk_size] data = ha_get( f"/api/history/period/{start.isoformat(timespec='seconds')}", params={"filter_entity_id": ",".join(chunk), "minimal_response": ""}, ) for entity_history in data: if not entity_history: continue entity_id = entity_history[0].get("entity_id", "") if not is_relevant_entity(entity_id): continue compact = [] for item in entity_history[-MAX_HISTORY_PER_ENTITY:]: state = item.get("state") if state in {"unknown", "unavailable", None}: continue compact.append({"state": state, "last_changed": item.get("last_changed")}) if len(set(x["state"] for x in compact)) > 1: changes.append({"entity_id": entity_id, "recent_states": compact}) return sorted(changes, key=lambda x: x["entity_id"]) def make_snapshot() -> dict[str, Any]: states = get_states() entity_ids = [state["entity_id"] for state in states] return { "generated_at": datetime.now().isoformat(timespec="seconds"), "history_hours": HISTORY_HOURS, "states": states, "history": get_history(HISTORY_HOURS, entity_ids), } def save_snapshot(snapshot: dict[str, Any]) -> Path: DATA_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") path = DATA_DIR / f"snapshot-{stamp}.json" path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8") return path def cleanup_old_snapshots() -> None: cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS) for path in DATA_DIR.glob("snapshot-*.json"): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: path.unlink(missing_ok=True) def load_recent_snapshots(hours: int) -> list[dict[str, Any]]: cutoff = datetime.now() - timedelta(hours=hours) snapshots = [] for path in sorted(DATA_DIR.glob("snapshot-*.json")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: snapshots.append(json.loads(path.read_text(encoding="utf-8"))) except Exception as exc: print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr) return snapshots def summarize_snapshot(snapshot: dict[str, Any]) -> str: lines = [f"Snapshot: {snapshot.get('generated_at')}", "Current states:"] for state in snapshot.get("states", []): attrs = state.get("attributes", {}) name = attrs.get("friendly_name", state.get("entity_id")) unit = attrs.get("unit_of_measurement", "") value = f"{state.get('state')} {unit}".strip() lines.append(f"- {name} ({state.get('entity_id')}): {value}; last_changed={state.get('last_changed')}") lines.append("Recently changed entities:") for item in snapshot.get("history", []): transitions = ", ".join(f"{x.get('state')} @ {x.get('last_changed')}" for x in item.get("recent_states", [])[-8:]) lines.append(f"- {item.get('entity_id')}: {transitions}") return "\n".join(lines) def build_daily_summary(snapshots: list[dict[str, Any]]) -> str: parts = [ f"Daily Home Assistant bundle generated {datetime.now().isoformat(timespec='seconds')}", f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.", ] for snapshot in snapshots: parts.append("\n---\n" + summarize_snapshot(snapshot)) return "\n".join(parts) def read_extra_llm_instructions() -> str: if not PROMPT_FILE.exists(): return "" return PROMPT_FILE.read_text(encoding="utf-8").strip() def load_recent_article_context(days: int) -> str: if days <= 0 or not REPORT_DIR.exists(): return "" cutoff = datetime.now() - timedelta(days=days) articles: list[str] = [] for path in sorted(REPORT_DIR.glob("daily-ai-analysis-*.md")): if datetime.fromtimestamp(path.stat().st_mtime) < cutoff: continue try: text = path.read_text(encoding="utf-8") except Exception as exc: print(f"Skipping unreadable previous report {path}: {exc}", file=sys.stderr) continue conclusions = text.split("\n## Data bundle\n", 1)[0].strip() articles.append(f"PREVIOUS ARTICLE {path.name}:\n{conclusions[:8000]}") return "\n\n---\n\n".join(articles[-7:]) def analysis_prompt(input_summary: str, previous_articles: str = "") -> str: extra_instructions = read_extra_llm_instructions() extra_block = "" if extra_instructions: extra_block = f""" ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}: {extra_instructions} """ previous_block = "" if previous_articles: previous_block = f""" PREVIOUS ARTICLES FROM THE LAST {ARTICLE_CONTEXT_DAYS} DAYS FOR CONTEXT: Use these only for trend/context awareness. Do not claim something happened today unless today's data supports it. {previous_articles} """ return f"""You are writing today's Home Assistant smart-home blog article for the owner. Write a funny but useful morning briefing in a blog/article style. Use light humor, emojis, and playful headings, but remain factual and privacy-aware. Include: - A short comedy headline for the day - What seemed to happen at home today - Behavioral patterns that can reasonably be inferred - Notable trends compared with recent previous articles, if supported - What a nosy raccoon/hacker could figure out about the resident - Anomalies, risks, or privacy/security concerns - Suggested Home Assistant automations or fixes Distinguish strong evidence from guesses. Do not invent facts not supported by the data. {extra_block}{previous_block} TODAY'S DATA: {input_summary} """ def call_ollama(prompt: str) -> str: response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300) response.raise_for_status() return response.json().get("response", "").strip() def call_openai(prompt: str) -> str: response = requests.post( "https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, json={ "model": OPENAI_MODEL, "messages": [ {"role": "system", "content": "You are a careful but funny smart-home analyst."}, {"role": "user", "content": prompt}, ], "temperature": 0.35, }, timeout=300, ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"].strip() def call_pi(prompt: str) -> str: cmd = [PI_BIN, "--no-tools"] if PI_MODEL: cmd.extend(["--model", PI_MODEL]) cmd.extend(["-p", "Analyze the Home Assistant data from stdin and write the requested briefing."]) result = subprocess.run( cmd, input=prompt, text=True, capture_output=True, timeout=PI_TIMEOUT, check=False, ) if result.returncode != 0: stderr = result.stderr.strip() raise RuntimeError(f"pi exited with status {result.returncode}: {stderr[-1000:]}") return result.stdout.strip() def get_llm_conclusions(input_summary: str, previous_articles: str = "") -> str: if LLM_MODE == "none": return "AI analysis disabled. Set LLM_MODE=pi, LLM_MODE=ollama, or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤" prompt = analysis_prompt(input_summary, previous_articles) if LLM_MODE == "ollama": return call_ollama(prompt) if LLM_MODE == "openai": return call_openai(prompt) if LLM_MODE == "pi": return call_pi(prompt) return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, pi, ollama, or openai." def markdownish_to_html(text: str) -> str: safe = html.escape(text) safe = re.sub(r"^### (.*)$", r"
{raw}