#!/usr/bin/env python3
"""
Home Assistant observer
Modes:
collect - run every 30 minutes; stores a compact JSON snapshot locally
analyze - run at 05:00; sends the last snapshots to AI and publishes a funny local web page
Configuration is via environment variables. See .env.example.
"""
from __future__ import annotations
import argparse
import html
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime, timedelta, timezone
from email.utils import format_datetime
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import requests
HA_URL = os.environ.get("HA_URL", "").rstrip("/")
HA_TOKEN = os.environ.get("HA_TOKEN", "")
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
REPORT_DIR = Path(os.environ.get("REPORT_DIR", "./reports"))
WEB_DIR = Path(os.environ.get("WEB_DIR", "./web"))
SITE_BASE_PATH = os.environ.get("SITE_BASE_PATH", "/").strip() or "/"
SITE_URL = os.environ.get("SITE_URL", "http://localhost").rstrip("/")
PROMPT_FILE = Path(os.environ.get("PROMPT_FILE", "./llm_instructions.md"))
HISTORY_HOURS = int(os.environ.get("HISTORY_HOURS", "24"))
MAX_HISTORY_PER_ENTITY = int(os.environ.get("MAX_HISTORY_PER_ENTITY", "20"))
CALENDAR_LOOKAHEAD_DAYS = int(os.environ.get("CALENDAR_LOOKAHEAD_DAYS", "7"))
MAX_CALENDAR_EVENTS_PER_CALENDAR = int(os.environ.get("MAX_CALENDAR_EVENTS_PER_CALENDAR", "8"))
ANALYZE_SNAPSHOT_HOURS = int(os.environ.get("ANALYZE_SNAPSHOT_HOURS", "24"))
ARTICLE_CONTEXT_DAYS = int(os.environ.get("ARTICLE_CONTEXT_DAYS", "7"))
MAX_ANALYZE_CHARS = int(os.environ.get("MAX_ANALYZE_CHARS", "80000"))
DISPLAY_TIMEZONE = os.environ.get("DISPLAY_TIMEZONE", "Europe/Copenhagen")
KEEP_SNAPSHOT_DAYS = int(os.environ.get("KEEP_SNAPSHOT_DAYS", "14"))
# LLM_MODE: none | pi | ollama | openai
LLM_MODE = os.environ.get("LLM_MODE", "none").lower()
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434").rstrip("/")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
PI_BIN = os.environ.get("PI_BIN", "pi")
PI_MODEL = os.environ.get("PI_MODEL", "")
PI_TIMEOUT = int(os.environ.get("PI_TIMEOUT", "600"))
RELEVANT_DOMAINS = set(
x.strip()
for x in os.environ.get(
"RELEVANT_DOMAINS",
"sensor,binary_sensor,person,device_tracker,climate,light,switch,lock,cover,alarm_control_panel,media_player,calendar,weather",
).split(",")
if x.strip()
)
EXCLUDED_ENTITIES = set(x.strip() for x in os.environ.get("EXCLUDED_ENTITIES", "").split(",") if x.strip())
ALLOWED_ATTRIBUTES = {
"friendly_name",
"unit_of_measurement",
"device_class",
"state_class",
"current_temperature",
"temperature",
"humidity",
"battery_level",
"brightness",
"gps_accuracy",
"source_type",
"assumed_state",
}
IMPORTANT_ENTITY_KEYWORDS = {
"alarm": 100,
"smoke": 100,
"co_": 100,
"carbon_monoxide": 100,
"leak": 95,
"water": 80,
"door": 85,
"window": 80,
"lock": 85,
"motion": 70,
"presence": 70,
"occupancy": 70,
"person": 75,
"device_tracker": 75,
"phone": 70,
"laptop": 60,
"battery": 65,
"humidity": 60,
"temperature": 55,
"climate": 55,
"heating": 55,
"dehumidifier": 70,
"backup": 70,
"internet": 65,
"speedtest": 65,
"router": 60,
"light": 45,
"switch": 35,
"sonos": 45,
"media": 40,
"tv": 40,
"megane": 50,
"fjr": 50,
"plant": 45,
"smb_": 60,
}
class ConfigError(RuntimeError):
pass
def require_config(for_ai: bool = False) -> None:
if not HA_URL:
raise ConfigError("HA_URL is not set")
if not HA_TOKEN:
raise ConfigError("HA_TOKEN is not set")
if for_ai and LLM_MODE == "openai" and not OPENAI_API_KEY:
raise ConfigError("LLM_MODE=openai but OPENAI_API_KEY is not set")
def ha_get(path: str, params: dict[str, str] | None = None) -> Any:
headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
response = requests.get(f"{HA_URL}{path}", headers=headers, params=params, timeout=60)
try:
response.raise_for_status()
except requests.HTTPError as exc:
detail = response.text.strip()
raise requests.HTTPError(f"{exc}; response={detail[:500]}", response=response) from exc
return response.json()
def is_relevant_entity(entity_id: str) -> bool:
return entity_id not in EXCLUDED_ENTITIES and entity_id.split(".", 1)[0] in RELEVANT_DOMAINS
def compact_attributes(attrs: dict[str, Any]) -> dict[str, Any]:
return {k: v for k, v in attrs.items() if k in ALLOWED_ATTRIBUTES}
def get_states() -> list[dict[str, Any]]:
useful: list[dict[str, Any]] = []
for item in ha_get("/api/states"):
entity_id = item.get("entity_id", "")
state = item.get("state")
if not is_relevant_entity(entity_id) or state in {"unknown", "unavailable", None}:
continue
useful.append(
{
"entity_id": entity_id,
"state": state,
"attributes": compact_attributes(item.get("attributes", {})),
"last_changed": item.get("last_changed"),
"last_updated": item.get("last_updated"),
}
)
return sorted(useful, key=lambda x: x["entity_id"])
def clean_text(value: Any, max_len: int = 300) -> str:
if not value:
return ""
text = re.sub(r"<[^>]+>", " ", str(value))
text = re.sub(r"\s+", " ", html.unescape(text)).strip()
return text[:max_len]
def human_date_label(dt: datetime, include_time: bool) -> str:
today = datetime.now(ZoneInfo(DISPLAY_TIMEZONE)).date()
event_date = dt.date()
delta_days = (event_date - today).days
if delta_days == 0:
day = "today"
elif delta_days == 1:
day = "tomorrow"
elif 1 < delta_days <= 7:
day = f"upcoming {dt.strftime('%A')}"
elif -7 <= delta_days < 0:
day = f"last {dt.strftime('%A')}"
else:
day = dt.strftime("%A")
if include_time:
return f"{day} at {dt.strftime('%H:%M')}"
return day
def event_time(value: dict[str, str] | None) -> str:
if not value:
return ""
if "dateTime" in value:
try:
dt = datetime.fromisoformat(value["dateTime"].replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return human_date_label(dt.astimezone(ZoneInfo(DISPLAY_TIMEZONE)), include_time=True)
except Exception:
return display_time(value.get("dateTime"))
if "date" in value:
try:
dt = datetime.fromisoformat(value["date"]).replace(tzinfo=ZoneInfo(DISPLAY_TIMEZONE))
return human_date_label(dt, include_time=False)
except Exception:
return value.get("date", "")
return ""
def get_calendar_events(calendar_entity_ids: list[str]) -> list[dict[str, Any]]:
if not calendar_entity_ids or CALENDAR_LOOKAHEAD_DAYS <= 0:
return []
start = datetime.now(timezone.utc)
end = start + timedelta(days=CALENDAR_LOOKAHEAD_DAYS)
calendars: list[dict[str, Any]] = []
for entity_id in calendar_entity_ids:
try:
events = ha_get(
f"/api/calendars/{entity_id}",
params={"start": start.isoformat(), "end": end.isoformat()},
)
except Exception as exc:
print(f"Skipping calendar events for {entity_id}: {exc}", file=sys.stderr)
continue
compact_events = []
for event in events[:MAX_CALENDAR_EVENTS_PER_CALENDAR]:
compact_events.append(
{
"summary": clean_text(event.get("summary"), 160),
"start": event_time(event.get("start")),
"end": event_time(event.get("end")),
"location": clean_text(event.get("location"), 180),
"description": clean_text(event.get("description"), 260),
}
)
if compact_events:
calendars.append({"entity_id": entity_id, "events": compact_events})
return calendars
def get_history(hours: int, entity_ids: list[str]) -> list[dict[str, Any]]:
start = datetime.now(timezone.utc) - timedelta(hours=hours)
changes: list[dict[str, Any]] = []
# Recent Home Assistant versions/configurations require filter_entity_id for
# the history endpoint. Query in chunks to avoid an overlong URL.
chunk_size = 50
for i in range(0, len(entity_ids), chunk_size):
chunk = entity_ids[i : i + chunk_size]
data = ha_get(
f"/api/history/period/{start.isoformat(timespec='seconds')}",
params={"filter_entity_id": ",".join(chunk), "minimal_response": ""},
)
for entity_history in data:
if not entity_history:
continue
entity_id = entity_history[0].get("entity_id", "")
if not is_relevant_entity(entity_id):
continue
compact = []
for item in entity_history[-MAX_HISTORY_PER_ENTITY:]:
state = item.get("state")
if state in {"unknown", "unavailable", None}:
continue
compact.append({"state": state, "last_changed": item.get("last_changed")})
if len(set(x["state"] for x in compact)) > 1:
changes.append({"entity_id": entity_id, "recent_states": compact})
return sorted(changes, key=lambda x: x["entity_id"])
def make_snapshot() -> dict[str, Any]:
states = get_states()
entity_ids = [state["entity_id"] for state in states]
calendar_entity_ids = [entity_id for entity_id in entity_ids if entity_id.startswith("calendar.")]
return {
"generated_at": datetime.now().isoformat(timespec="seconds"),
"history_hours": HISTORY_HOURS,
"calendar_lookahead_days": CALENDAR_LOOKAHEAD_DAYS,
"states": states,
"history": get_history(HISTORY_HOURS, entity_ids),
"calendar_events": get_calendar_events(calendar_entity_ids),
}
def save_snapshot(snapshot: dict[str, Any]) -> Path:
DATA_DIR.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = DATA_DIR / f"snapshot-{stamp}.json"
path.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")
return path
def cleanup_old_snapshots() -> None:
cutoff = datetime.now() - timedelta(days=KEEP_SNAPSHOT_DAYS)
for path in DATA_DIR.glob("snapshot-*.json"):
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
path.unlink(missing_ok=True)
def load_recent_snapshots(hours: int) -> list[dict[str, Any]]:
cutoff = datetime.now() - timedelta(hours=hours)
snapshots = []
for path in sorted(DATA_DIR.glob("snapshot-*.json")):
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
continue
try:
snapshots.append(json.loads(path.read_text(encoding="utf-8")))
except Exception as exc:
print(f"Skipping unreadable snapshot {path}: {exc}", file=sys.stderr)
return snapshots
def display_time(value: str | None) -> str:
if not value:
return ""
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
local = dt.astimezone(ZoneInfo(DISPLAY_TIMEZONE))
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
except Exception:
return value
def entity_importance(entity_id: str, attrs: dict[str, Any] | None = None) -> int:
attrs = attrs or {}
domain = entity_id.split(".", 1)[0]
text = f"{entity_id} {attrs.get('friendly_name', '')} {attrs.get('device_class', '')}".lower()
score = 0
domain_scores = {
"alarm_control_panel": 100,
"lock": 90,
"person": 80,
"device_tracker": 75,
"binary_sensor": 60,
"climate": 55,
"cover": 50,
"sensor": 45,
"light": 35,
"switch": 30,
"media_player": 25,
}
score += domain_scores.get(domain, 10)
for keyword, points in IMPORTANT_ENTITY_KEYWORDS.items():
if keyword in text:
score += points
# Sønderborg/Denmark home is the primary residence and absolute priority.
# Samobor/Croatia entities use the smb_ prefix and are still included, but
# they should lose ties when the LLM input has to be size-limited.
if "smb_" in entity_id.lower():
score -= 40
else:
score += 120
state = str(attrs.get("state", "")).lower()
if state in {"on", "open", "unlocked", "detected", "home"}:
score += 15
return score
def summarize_snapshot(snapshot: dict[str, Any]) -> str:
lines = [
f"Snapshot: {display_time(snapshot.get('generated_at'))}",
"Priority current states first; lower-priority entities follow only if the LLM size limit allows.",
"Current states:",
]
states = sorted(
snapshot.get("states", []),
key=lambda state: (-entity_importance(state.get("entity_id", ""), state.get("attributes", {})), state.get("entity_id", "")),
)
for state in states:
attrs = state.get("attributes", {})
name = attrs.get("friendly_name", state.get("entity_id"))
unit = attrs.get("unit_of_measurement", "")
value = f"{state.get('state')} {unit}".strip()
score = entity_importance(state.get("entity_id", ""), attrs)
lines.append(f"- importance={score} {name} ({state.get('entity_id')}): {value}; last_changed={display_time(state.get('last_changed'))}")
lines.append("Upcoming calendar events:")
for calendar in snapshot.get("calendar_events", []):
lines.append(f"- {calendar.get('entity_id')}:")
for event in calendar.get("events", []):
details = []
if event.get("location"):
details.append(f"location={event.get('location')}")
if event.get("description"):
details.append(f"description={event.get('description')}")
detail_text = f"; {'; '.join(details)}" if details else ""
lines.append(f" - {event.get('start')} to {event.get('end')}: {event.get('summary')}{detail_text}")
lines.append("Recently changed entities:")
history = sorted(
snapshot.get("history", []),
key=lambda item: (-entity_importance(item.get("entity_id", "")), item.get("entity_id", "")),
)
for item in history:
transitions = ", ".join(f"{x.get('state')} @ {display_time(x.get('last_changed'))}" for x in item.get("recent_states", [])[-8:])
score = entity_importance(item.get("entity_id", ""))
lines.append(f"- importance={score} {item.get('entity_id')}: {transitions}")
return "\n".join(lines)
def build_daily_summary(snapshots: list[dict[str, Any]]) -> str:
parts = [
f"Daily Home Assistant bundle generated {datetime.now(ZoneInfo(DISPLAY_TIMEZONE)).isoformat(timespec='seconds')}",
f"Contains {len(snapshots)} snapshots from roughly the last {ANALYZE_SNAPSHOT_HOURS} hours.",
f"Input capped at roughly {MAX_ANALYZE_CHARS} characters for the LLM.",
f"All times in this bundle are converted to {DISPLAY_TIMEZONE} local time.",
]
total = len("\n".join(parts))
included = 0
for snapshot in reversed(snapshots):
block = "\n---\n" + summarize_snapshot(snapshot)
if total + len(block) > MAX_ANALYZE_CHARS and included > 0:
break
if len(block) > MAX_ANALYZE_CHARS:
block = block[:MAX_ANALYZE_CHARS] + "\n[Snapshot truncated for LLM size limit]"
parts.append(block)
total += len(block)
included += 1
parts.insert(2, f"Included {included} most recent snapshots after size limiting.")
return "\n".join(parts)
def read_extra_llm_instructions() -> str:
if not PROMPT_FILE.exists():
return ""
return PROMPT_FILE.read_text(encoding="utf-8").strip()
def load_recent_article_context(days: int) -> str:
if days <= 0 or not REPORT_DIR.exists():
return ""
cutoff = datetime.now() - timedelta(days=days)
articles: list[str] = []
for path in sorted(REPORT_DIR.glob("daily-ai-analysis-*.md")):
if datetime.fromtimestamp(path.stat().st_mtime) < cutoff:
continue
try:
text = path.read_text(encoding="utf-8")
except Exception as exc:
print(f"Skipping unreadable previous report {path}: {exc}", file=sys.stderr)
continue
conclusions = text.split("\n## Data bundle\n", 1)[0].strip()
articles.append(f"PREVIOUS ARTICLE {path.name}:\n{conclusions[:8000]}")
return "\n\n---\n\n".join(articles[-7:])
def analysis_prompt(input_summary: str, previous_articles: str = "") -> str:
extra_instructions = read_extra_llm_instructions()
extra_block = ""
if extra_instructions:
extra_block = f"""
ADDITIONAL OWNER INSTRUCTIONS FROM {PROMPT_FILE}:
{extra_instructions}
"""
previous_block = ""
if previous_articles:
previous_block = f"""
PREVIOUS ARTICLES FROM THE LAST {ARTICLE_CONTEXT_DAYS} DAYS FOR CONTEXT:
Use these only for trend/context awareness. Do not claim something happened today unless today's data supports it.
{previous_articles}
"""
return f"""You are writing today's Home Assistant smart-home blog article for the owner.
Write a funny but useful morning briefing in a clean blog/article style. Use light humor,
but keep emojis/smileys rare: at most one in the whole article. Prefer clear headings,
short paragraphs, and readable bullet lists. Remain factual and privacy-aware. Include:
- A short comedy headline for the day
- What seemed to happen at home today
- Behavioral patterns that can reasonably be inferred
- Notable trends compared with recent previous articles, if supported
- What a nosy raccoon/hacker could figure out about the resident
- Anomalies, risks, or privacy/security concerns
- Suggested Home Assistant automations or fixes
Distinguish strong evidence from guesses. Do not invent facts not supported by the data.
{extra_block}{previous_block}
TODAY'S DATA:
{input_summary}
"""
def call_ollama(prompt: str) -> str:
response = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=300)
response.raise_for_status()
return response.json().get("response", "").strip()
def call_openai(prompt: str) -> str:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"},
json={
"model": OPENAI_MODEL,
"messages": [
{"role": "system", "content": "You are a careful but funny smart-home analyst."},
{"role": "user", "content": prompt},
],
"temperature": 0.35,
},
timeout=300,
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"].strip()
def call_pi(prompt: str) -> str:
# Avoid piping the prompt on stdin here. In pi print mode, piped stdin can be
# treated as the primary output/input stream in surprising ways. Passing the
# prompt as an @file gives reliable non-interactive cron behavior.
with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=".md", delete=False) as tmp:
tmp.write(prompt)
prompt_path = tmp.name
try:
cmd = [PI_BIN, "--no-tools"]
if PI_MODEL:
cmd.extend(["--model", PI_MODEL])
cmd.extend(["-p", f"@{prompt_path}"])
result = subprocess.run(
cmd,
text=True,
capture_output=True,
timeout=PI_TIMEOUT,
check=False,
)
finally:
Path(prompt_path).unlink(missing_ok=True)
if result.returncode != 0:
stderr = result.stderr.strip()
raise RuntimeError(f"pi exited with status {result.returncode}: {stderr[-1000:]}")
output = result.stdout.strip()
if not output:
raise RuntimeError("pi returned an empty analysis")
return output
def get_llm_conclusions(input_summary: str, previous_articles: str = "") -> str:
if LLM_MODE == "none":
return "AI analysis disabled. Set LLM_MODE=pi, LLM_MODE=ollama, or LLM_MODE=openai in .env. The raccoon analyst is asleep. 🦝💤"
prompt = analysis_prompt(input_summary, previous_articles)
if LLM_MODE == "ollama":
return call_ollama(prompt)
if LLM_MODE == "openai":
return call_openai(prompt)
if LLM_MODE == "pi":
return call_pi(prompt)
return f"Unknown LLM_MODE={LLM_MODE!r}. Use none, pi, ollama, or openai."
def remove_most_emoji(text: str) -> str:
# Keep the writing readable on the blog page even if the model gets a bit too festive.
return re.sub(r"[\U0001F300-\U0001FAFF\U00002700-\U000027BF\U00002600-\U000026FF]+", "", text)
def inline_markdown(text: str) -> str:
safe = html.escape(remove_most_emoji(text).strip())
safe = re.sub(r"\*\*(.*?)\*\*", r"\1", safe)
safe = re.sub(r"`([^`]+)`", r"\1", safe)
return safe
def move_bottom_line_before_serious(blocks: list[str]) -> list[str]:
serious_start = None
bottom_start = None
bottom_end = None
for i, block in enumerate(blocks):
heading = re.match(r"{current_summary}
\n{content}\n
{inline_markdown(' '.join(paragraph))}
") paragraph = [] def flush_list() -> None: nonlocal list_items if list_items: blocks.append("{raw}