Files
upbeatBytes/goodnews/auth.py
T
thejayman77 15728c3bcb User avatar (Google picture), avatar in mobile You tab, /account page
- Capture the Google profile picture (picture claim) into users.avatar_url; an
  Avatar component shows it, falling back to the initial. Used in the desktop
  header and the mobile "You" tab (which now shows the user when signed in).
- Move account/settings to its own route /account (robust + scrolls to top),
  reached by the desktop avatar and the mobile You tab; drop the inline "You"
  sheet. AccountPanel gains a Sign out action; the page links to Saved/History/
  Boundaries via home intent params (?view= / ?open=).
- db: users.avatar_url (schema + idempotent migration). 118 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 14:41:43 +00:00

157 lines
5.5 KiB
Python

"""Account auth core: users, identities, magic-link tokens, sessions.
Stdlib-only helpers over a sqlite3 connection. The caller (the API) supplies the
connection and commits. Self-hosted, minimal-PII. Secrets are random and stored
only as SHA-256 hashes — the raw token is shown exactly once (in the magic link
or returned to the client as a session token).
Identity model: a user can have several `identities` (email magic-link, google,
later apple). They resolve to ONE user by verified email, so signing in with
Google and with a magic link to the same address lands on the same account.
"""
from __future__ import annotations
import hashlib
import secrets
import sqlite3
from datetime import datetime, timedelta, timezone
LOGIN_TOKEN_TTL = timedelta(minutes=15)
SESSION_TTL = timedelta(days=60)
def _now() -> datetime:
return datetime.now(timezone.utc)
def _iso(dt: datetime) -> str:
return dt.replace(microsecond=0).isoformat()
def _parse(value: str) -> datetime:
dt = datetime.fromisoformat(value)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
def _hash(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def normalize_email(email: str) -> str:
return (email or "").strip().lower()
# ---- users & identities ------------------------------------------------------
def get_user(conn: sqlite3.Connection, user_id: int) -> sqlite3.Row | None:
return conn.execute(
"SELECT id, email, display_name, avatar_url, created_at FROM users WHERE id = ?", (user_id,)
).fetchone()
def find_or_create_user(
conn: sqlite3.Connection,
email: str,
provider: str,
provider_subject: str,
display_name: str | None = None,
avatar_url: str | None = None,
) -> int:
"""Resolve (or create) the user for a verified sign-in, linking the identity.
Order: an existing identity wins; else an existing user with the same verified
email is linked to the new identity; else a fresh user is created.
"""
email = normalize_email(email)
existing = conn.execute(
"SELECT user_id FROM identities WHERE provider = ? AND provider_subject = ?",
(provider, provider_subject),
).fetchone()
if existing:
return existing["user_id"]
user = conn.execute("SELECT id FROM users WHERE email = ?", (email,)).fetchone()
if user:
user_id = user["id"]
# Fill display name if missing; refresh avatar whenever the provider gives one.
conn.execute(
"UPDATE users SET display_name = COALESCE(display_name, ?), "
"avatar_url = COALESCE(?, avatar_url), updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(display_name, avatar_url, user_id),
)
else:
user_id = conn.execute(
"INSERT INTO users (email, display_name, avatar_url) VALUES (?, ?, ?)",
(email, display_name, avatar_url),
).lastrowid
conn.execute(
"INSERT OR IGNORE INTO identities (user_id, provider, provider_subject) VALUES (?, ?, ?)",
(user_id, provider, provider_subject),
)
return user_id
# ---- magic-link tokens -------------------------------------------------------
def create_login_token(conn: sqlite3.Connection, email: str) -> str:
"""Create a single-use sign-in token; return the raw token (goes in the link)."""
raw = secrets.token_urlsafe(32)
conn.execute(
"INSERT INTO login_tokens (email, token_hash, expires_at) VALUES (?, ?, ?)",
(normalize_email(email), _hash(raw), _iso(_now() + LOGIN_TOKEN_TTL)),
)
return raw
def consume_login_token(conn: sqlite3.Connection, raw: str) -> str | None:
"""Return the email for a valid, unused, unexpired token, marking it used.
Returns None (and changes nothing) if the token is unknown, already used, or
expired. Single-use even under concurrent taps (the consumed_at guard).
"""
if not raw:
return None
row = conn.execute(
"SELECT id, email, expires_at, consumed_at FROM login_tokens WHERE token_hash = ?",
(_hash(raw),),
).fetchone()
if not row or row["consumed_at"] is not None or _parse(row["expires_at"]) < _now():
return None
consumed = conn.execute(
"UPDATE login_tokens SET consumed_at = ? WHERE id = ? AND consumed_at IS NULL",
(_iso(_now()), row["id"]),
).rowcount
return row["email"] if consumed else None
# ---- sessions ----------------------------------------------------------------
def create_session(conn: sqlite3.Connection, user_id: int, user_agent: str | None = None) -> str:
"""Create a session; return the raw token (cookie value / bearer token)."""
raw = secrets.token_urlsafe(32)
conn.execute(
"INSERT INTO sessions (user_id, token_hash, expires_at, user_agent) VALUES (?, ?, ?, ?)",
(user_id, _hash(raw), _iso(_now() + SESSION_TTL), (user_agent or "")[:300]),
)
return raw
def resolve_session(conn: sqlite3.Connection, raw: str | None) -> sqlite3.Row | None:
"""Return the user for a valid session token (and refresh last_seen), else None."""
if not raw:
return None
row = conn.execute(
"SELECT id, user_id, expires_at FROM sessions WHERE token_hash = ?", (_hash(raw),)
).fetchone()
if not row or _parse(row["expires_at"]) < _now():
return None
conn.execute("UPDATE sessions SET last_seen_at = ? WHERE id = ?", (_iso(_now()), row["id"]))
return get_user(conn, row["user_id"])
def revoke_session(conn: sqlite3.Connection, raw: str | None) -> None:
if raw:
conn.execute("DELETE FROM sessions WHERE token_hash = ?", (_hash(raw),))