haobserver/ha_observer.py
2026-05-16 08:02:01 +00:00

373 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Home Assistant observer
Modes:
collect - run every 30 minutes; stores a compact JSON snapshot locally
analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page
Configuration is via environment variables. See .env.example.
"""
from __future__ import annotations
import argparse
import html
import json
import os
import re
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import requests
HA_URL = os.environ.get("HA_URL", "").rstrip("/")
HA_TOKEN = os.environ.get("HA_TOKEN", "")
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports"))
WEB_DIR = Path(os.environ.get("WEB_DIR", "./web"))
PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md"))
HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24"))
MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20"))
ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24"))
KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14"))
# LLM_MODE: none | ollama | openai
LLM_MODE = os.environ.get("LLM_MODE", "none").lower()
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
RELEVANT_DOMAINS = set(
x.strip()
for x in os.environ.get(
"RELEVANT_DOMAINS",
"sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather",
).split(",")
if x.strip()
)
EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip())
ALLOWED_ATTRIBUTES = {
"friendly_name",
"unit_of_measurement",
"device_class",
"state_class",
"current_temperature",
"temperature",
"humidity",
"battery_level",
"brightness",
"gps_accuracy",
"source_type",
"assumed_state",
}
class ConfigError(RuntimeError):
pass
def require_config(for_ai: bool = False) -> None:
if not HA_URL:
raise ConfigError("HA_URL is not set")
if not HA_TOKEN:
raise ConfigError("HA_TOKEN is not set")
if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY:
raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set")
def ha_get(path: str) -> Any:
headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
response = requests.get(f"{HA_URL}{path}", headers=headers, timeout=60)
response.raise_for_status()
return response.json()
def is_relevant_entity(entity_id: str) -> bool:
return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS
def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]:
return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES}
def get_states() -> list[dict[str, Any]]:
useful: list[dict[str, Any]] = []
for item in ha_get("/api/states"):
entity_id = item.get("entity_id", "")
state = item.get("state")
if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}:
continue
useful.append(
{
"entity_id": entity_id,
"state": state,
"attributes": compact_attributes(item.get("attributes", {})),
"last_changed": item.get("last_changed"),
"last_updated": item.get("last_updated"),
}
)
return sorted(useful, key=lambda x: x["entity_id"])
def get_history(hours: int) -> list[dict[str, Any]]:
start = datetime.now(timezone.utc) - timedelta(hours=hours)
data = ha_get(f"/api/history/period/{start.isoformat()}?minimal_response")
changes: list[dict[str, Any]] = []
for entity_history in data:
if not entity_history:
continue
entity_id = entity_history[0].get("entity_id", "")
if not is_relevant_entity(entity_id):
continue
compact = []
for item in entity_history[-MAX_HISTORY_PER_ENTITY:]:
state = item.get("state")
if state in {"unknown", "unavailable", None}:
continue
compact.append({"state": state, "last_changed": item.get("last_changed")})
if len(set(x["state"] for x in compact)) > 1:
changes.append({"entity_id": entity_id, "recent_states": compact})
return sorted(changes, key=lambda x: x["entity_id"])
def make_snapshot() -> dict[str, Any]:
return {
"generated_at": datetime.now().isoformat(timespec="seconds"),
"history_hours": HISTORY_HOURS,
"states": get_states(),
"history": get_history(HISTORY_HOURS),
}
def save_snapshot(snapshot: dict[str, Any]) -> Path:
DATA_DIR.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = DATA_DIR / f"snapshot-{stamp}.json"
path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")
return path
def cleanup_old_snapshots() -> None:
cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS)
for path in DATA_DIR.glob("snapshot-*.json"):
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
path.unlink(missing_ok=True)
def load_recent_snapshots(hours: int) -> list[dict[str, Any]]:
cutoff = datetime.now() - timedelta(hours=hours)
snapshots = []
for path in sorted(DATA_DIR.glob("snapshot-*.json")):
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
continue
try:
snapshots.append(json.loads(path.read_text(encoding="utf-8")))
except Exception as exc:
print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr)
return snapshots
def summarize_snapshot(snapshot: dict[str, Any]) -> str:
lines = [f"Snapshot: {snapshot.get('generated_at')}", "Current states:"]
for state in snapshot.get("states", []):
attrs = state.get("attributes", {})
name = attrs.get("friendly_name", state.get("entity_id"))
unit = attrs.get("unit_of_measurement", "")
value = f"{state.get('state')} {unit}".strip()
lines.append(f"- {name} ({state.get('entity_id')}): {value}; last_changed={state.get('last_changed')}")
lines.append("Recently changed entities:")
for item in snapshot.get("history", []):
transitions = ", ".join(f"{x.get('state')} @ {x.get('last_changed')}" for x in item.get("recent_states", [])[-8:])
lines.append(f"- {item.get('entity_id')}: {transitions}")
return "\n".join(lines)
def build_daily_summary(snapshots: list[dict[str, Any]]) -> str:
parts = [
f"Daily Home Assistant bundle generated {datetime.now().isoformat(timespec='seconds')}",
f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.",
]
for snapshot in snapshots:
parts.append("\n---\n" + summarize_snapshot(snapshot))
return "\n".join(parts)
def read_extra_llm_instructions() -> str:
if not PROMPT_FILE.exists():
return ""
return PROMPT_FILE.read_text(encoding="utf-8").strip()
def analysis_prompt(input_summary: str) -> str:
extra_instructions = read_extra_llm_instructions()
extra_block = ""
if extra_instructions:
extra_block = f"""
ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}:
{extra_instructions}
"""
return f"""You are analyzing a day of Home Assistant smart-home data for the owner.
Write a funny but useful morning briefing. Use light humor, emojis, and playful headings,
but remain factual and privacy-aware. Include:
- A short comedy headline for the day
- What seemed to happen at home
- Behavioral patterns that can reasonably be inferred
- What a nosy raccoon/hacker could figure out about the resident
- Anomalies, risks, or privacy/security concerns
- Suggested Home Assistant automations or fixes
Distinguish strong evidence from guesses. Do not invent facts not supported by the data.
{extra_block}
DATA:
{input_summary}
"""
def call_ollama(prompt: str) -> str:
response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300)
response.raise_for_status()
return response.json().get("response", "").strip()
def call_openai(prompt: str) -> str:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"},
json={
"model": OPENAI_MODEL,
"messages": [
{"role": "system", "content": "You are a careful but funny smart-home analyst."},
{"role": "user", "content": prompt},
],
"temperature": 0.35,
},
timeout=300,
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"].strip()
def get_llm_conclusions(input_summary: str) -> str:
if LLM_MODE == "none":
return "AI analysis disabled. Set LLM_MODE=ollama or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤"
prompt = analysis_prompt(input_summary)
if LLM_MODE == "ollama":
return call_ollama(prompt)
if LLM_MODE == "openai":
return call_openai(prompt)
return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, ollama, or openai."
def markdownish_to_html(text: str) -> str:
safe = html.escape(text)
safe = re.sub(r"^### (.*)$", r"<h3>\1</h3>", safe, flags=re.MULTILINE)
safe = re.sub(r"^## (.*)$", r"<h2>\1</h2>", safe, flags=re.MULTILINE)
safe = re.sub(r"^# (.*)$", r"<h1>\1</h1>", safe, flags=re.MULTILINE)
safe = re.sub(r"^- (.*)$", r"<li>\1</li>", safe, flags=re.MULTILINE)
safe = safe.replace("\n", "<br>\n")
return safe
def publish_webpage(conclusions: str, raw_summary: str) -> Path:
WEB_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now().strftime("%Y-%m-%d %H:%M")
body = markdownish_to_html(conclusions)
raw = html.escape(raw_summary[:60000])
page = f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="refresh" content="1800">
<title>Smart Home Gossip Gazette</title>
<style>
body {{ margin:0; font-family: system-ui, sans-serif; background: #151522; color:#f7f7fb; }}
header {{ padding: 2rem; background: linear-gradient(135deg,#7c3aed,#db2777,#f59e0b); color:white; }}
main {{ max-width: 950px; margin: 0 auto; padding: 2rem; }}
.card {{ background:#222238; border:1px solid #39395b; border-radius:20px; padding:1.4rem; box-shadow:0 12px 30px #0006; }}
h1 {{ margin:0; font-size: clamp(2rem, 5vw, 4rem); }}
h2,h3 {{ color:#fde68a; }}
li {{ margin:.35rem 0; }}
.mascot {{ font-size:3rem; float:right; animation: wiggle 2s infinite; }}
details {{ margin-top: 2rem; }}
pre {{ white-space: pre-wrap; background:#0f0f18; color:#d1d5db; padding:1rem; border-radius:12px; overflow:auto; }}
a {{ color:#93c5fd; }}
@keyframes wiggle {{ 0%,100% {{ transform: rotate(-3deg); }} 50% {{ transform: rotate(3deg); }} }}
</style>
</head>
<body>
<header>
<div class="mascot">🦝🏠</div>
<h1>Smart Home Gossip Gazette</h1>
<p>Fresh 5AM nonsense-powered intelligence briefing · Generated {html.escape(now)}</p>
</header>
<main>
<section class="card">{body}</section>
<details>
<summary>Raw data bundle shown to the AI goblin</summary>
<pre>{raw}</pre>
</details>
</main>
</body>
</html>
"""
path = WEB_DIR / "index.html"
path.write_text(page, encoding="utf-8")
return path
def write_markdown_report(summary: str, conclusions: str) -> Path:
REPORT_DIR.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = REPORT_DIR / f"daily-ai-analysis-{stamp}.md"
path.write_text(f"# Daily Home Assistant AI Analysis\n\n{conclusions}\n\n## Data bundle\n\n```text\n{summary}\n```\n", encoding="utf-8")
return path
def cmd_collect() -> int:
require_config(for_ai=False)
snapshot = make_snapshot()
path = save_snapshot(snapshot)
cleanup_old_snapshots()
print(f"Collected snapshot: {path}")
return 0
def cmd_analyze() -> int:
require_config(for_ai=True)
snapshots = load_recent_snapshots(ANALYZE_SNAPSHOT_HOURS)
if not snapshots:
raise RuntimeError(f"No snapshots found in {DATA_DIR}; run collect first")
summary = build_daily_summary(snapshots)
conclusions = get_llm_conclusions(summary)
md_path = write_markdown_report(summary, conclusions)
html_path = publish_webpage(conclusions, summary)
print(f"Wrote report: {md_path}")
print(f"Published webpage: {html_path}")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Home Assistant observer")
parser.add_argument("mode", nargs="?", default="collect", choices=["collect", "analyze"], help="collect snapshots or analyze/publish them")
args = parser.parse_args()
try:
return cmd_collect() if args.mode == "collect" else cmd_analyze()
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())