"""Account auth core: users, identities, magic-link tokens, sessions. Stdlib-only helpers over a sqlite3 connection. The caller (the API) supplies the connection and commits. Self-hosted, minimal-PII. Secrets are random and stored only as SHA-256 hashes — the raw token is shown exactly once (in the magic link or returned to the client as a session token). Identity model: a user can have several `identities` (email magic-link, google, later apple). They resolve to ONE user by verified email, so signing in with Google and with a magic link to the same address lands on the same account. """ from __future__ import annotations import hashlib import secrets import sqlite3 from datetime import datetime, timedelta, timezone LOGIN_TOKEN_TTL = timedelta(minutes=15) SESSION_TTL = timedelta(days=60) def _now() -> datetime: return datetime.now(timezone.utc) def _iso(dt: datetime) -> str: return dt.replace(microsecond=0).isoformat() def _parse(value: str) -> datetime: dt = datetime.fromisoformat(value) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) def _hash(token: str) -> str: return hashlib.sha256(token.encode("utf-8")).hexdigest() def normalize_email(email: str) -> str: return (email or "").strip().lower() # ---- users & identities ------------------------------------------------------ def get_user(conn: sqlite3.Connection, user_id: int) -> sqlite3.Row | None: return conn.execute( "SELECT id, email, display_name, avatar_url, is_admin, digest_enabled, digest_unsub_token, created_at " "FROM users WHERE id = ?", (user_id,), ).fetchone() def find_or_create_user( conn: sqlite3.Connection, email: str, provider: str, provider_subject: str, display_name: str | None = None, avatar_url: str | None = None, ) -> int: """Resolve (or create) the user for a verified sign-in, linking the identity. Order: an existing identity wins; else an existing user with the same verified email is linked to the new identity; else a fresh user is created. """ email = normalize_email(email) existing = conn.execute( "SELECT user_id FROM identities WHERE provider = ? AND provider_subject = ?", (provider, provider_subject), ).fetchone() if existing: user_id = existing["user_id"] else: user = conn.execute("SELECT id FROM users WHERE email = ?", (email,)).fetchone() if user: user_id = user["id"] else: user_id = conn.execute( "INSERT INTO users (email, display_name, avatar_url) VALUES (?, ?, ?)", (email, display_name, avatar_url), ).lastrowid conn.execute( "INSERT OR IGNORE INTO identities (user_id, provider, provider_subject) VALUES (?, ?, ?)", (user_id, provider, provider_subject), ) # Always refresh provider-supplied profile bits (even for a returning identity): # fill the name if missing, and keep the avatar current when the provider sends one. if display_name or avatar_url: conn.execute( "UPDATE users SET display_name = COALESCE(display_name, ?), " "avatar_url = COALESCE(?, avatar_url), updated_at = CURRENT_TIMESTAMP WHERE id = ?", (display_name, avatar_url, user_id), ) return user_id # ---- magic-link tokens ------------------------------------------------------- def create_login_token(conn: sqlite3.Connection, email: str) -> str: """Create a single-use sign-in token; return the raw token (goes in the link).""" raw = secrets.token_urlsafe(32) conn.execute( "INSERT INTO login_tokens (email, token_hash, expires_at) VALUES (?, ?, ?)", (normalize_email(email), _hash(raw), _iso(_now() + LOGIN_TOKEN_TTL)), ) return raw def consume_login_token(conn: sqlite3.Connection, raw: str) -> str | None: """Return the email for a valid, unused, unexpired token, marking it used. Returns None (and changes nothing) if the token is unknown, already used, or expired. Single-use even under concurrent taps (the consumed_at guard). """ if not raw: return None row = conn.execute( "SELECT id, email, expires_at, consumed_at FROM login_tokens WHERE token_hash = ?", (_hash(raw),), ).fetchone() if not row or row["consumed_at"] is not None or _parse(row["expires_at"]) < _now(): return None consumed = conn.execute( "UPDATE login_tokens SET consumed_at = ? WHERE id = ? AND consumed_at IS NULL", (_iso(_now()), row["id"]), ).rowcount return row["email"] if consumed else None # ---- sessions ---------------------------------------------------------------- def create_session(conn: sqlite3.Connection, user_id: int, user_agent: str | None = None) -> str: """Create a session; return the raw token (cookie value / bearer token).""" raw = secrets.token_urlsafe(32) conn.execute( "INSERT INTO sessions (user_id, token_hash, expires_at, user_agent) VALUES (?, ?, ?, ?)", (user_id, _hash(raw), _iso(_now() + SESSION_TTL), (user_agent or "")[:300]), ) return raw def resolve_session(conn: sqlite3.Connection, raw: str | None) -> sqlite3.Row | None: """Return the user for a valid session token (and refresh last_seen), else None.""" if not raw: return None row = conn.execute( "SELECT id, user_id, expires_at FROM sessions WHERE token_hash = ?", (_hash(raw),) ).fetchone() if not row or _parse(row["expires_at"]) < _now(): return None conn.execute("UPDATE sessions SET last_seen_at = ? WHERE id = ?", (_iso(_now()), row["id"])) return get_user(conn, row["user_id"]) def revoke_session(conn: sqlite3.Connection, raw: str | None) -> None: if raw: conn.execute("DELETE FROM sessions WHERE token_hash = ?", (_hash(raw),))