"""
build_pression_iris.py  (v6)
============================
Computes Selling Pressure Score (SPS), Buying Pressure Score (BPS), and
Net Pressure Index (NPI) per IRIS for Nord (59).

v6 — coherent NPS design:
  1. Department-level centering: population-weighted mean NPI = 0 (exact, by
     construction).  NPI is a *relative* indicator — deviation from Nord's own
     equilibrium, not an absolute scale.
  2. Price momentum (BPS 10 %): CAGR of DVF prix_m2 over available years.
     Prevents high NPS where prices have not moved.
  3. Split elderly signal: very_elderly_ratio (pop 75+, 22 %) + elderly_owner
     (pop75+ × ownership_rate, 18 %).  Near-term supply vs medium-term.
  4. First-buyer age 25–44 replaces prime_age_ratio 25–59.  The 55–59 cohort
     is statistically more likely to sell than to buy.
  5. Environmental SPS: flood_risk (5 %) + noise_level (5 %) from existing
     SQLite tables (flood_zones, noise_zones).

NPI thresholds (same convention as v1–v5):
  > +0.30  buying pressure   → upward price pressure
  < -0.30  selling pressure  → downward price pressure
  ±0.30    balanced market   (Nord department mean is exactly 0 by construction)

--- SPS (Selling Pressure) -- 7 indicators, weights sum to 1.0 ---
  very_elderly_ratio  22 %  pop 75+ / pop totale  (RP2022)
                            Near-term supply pipeline: 75+ face assisted-living
                            transitions and inheritance sales within 5–10 years.
  elderly_owner       18 %  (pop 75+ / pop) × ownership_rate  (RP2022)
                            Interaction term: only IRIS with high elderly AND high
                            ownership get SPS signal (the actual supply risk).
                            Removes correlated double-count vs old senior_ownership.
  vacancy_rate        20 %  logements vacants / total logements  (RP2022)
  pop_decline         15 %  max(0, pop2018-pop2022)/pop2018  [exponential]
  poverty_rate        15 %  beneficiaires RSA / personnes couvertes  (CAF 2023)
  flood_risk           5 %  worst flood scenario at IRIS centroid  (SQLite flood_zones)
                            frequent=1.0, moyen=0.6, rare=0.2, none=0.0
  noise_level          5 %  proximity-weighted LDEN exposure  (SQLite noise_zones)
                            Environmental push factor: high noise → buyer deterrence.

--- BPS (Buying Pressure) -- 9 indicators, weights sum to 1.0 ---
  income_level        17 %  revenu disponible median annuel  (Filosofi 2021)
  first_buyer_ratio   18 %  pop 25-44 / pop totale  (RP2022)
                            Peak French homebuyer demographic (median first-purchase
                            age ≈32; move-up buyers peak 38–44).
  security_score      15 %  10 - danger_score  (iris_securite.json)
  employment_rate     15 %  actifs occupes / (actifs + DEFM)  (RP2022 + DEFM 2024)
  price_momentum      10 %  CAGR of prix_m2_median over DVF years  (prix_evolution_iris)
                            Market confirmation: rising prices signal real buying
                            demand; prevents high NPS where prices are flat.
  diploma_rate         9 %  (bac+2 + bac+3-4 + bac+5) / pop 15+  (RP2022)
  greenery_pct         9 %  park/forest/natural fraction of IRIS area  (OSM)
  amenity_score        4 %  weighted BPE equipment count  (BPE 2024)
  transaction_rate     3 %  avg annual DVF transactions  (prix_evolution_iris)

Centering algorithm:
  npi_raw      = bps - sps  (both ∈ [0, 1])
  dept_mean    = population-weighted mean of npi_raw across all Nord IRIS
  npi_centered = npi_raw - dept_mean
  dept_std     = population-weighted std dev of npi_centered
  npi_final    = clip(npi_centered / (3 × dept_std), -1, +1)
  → population-weighted mean of npi_final = 0 (exact, by construction)
  → ±3σ maps to ±1.0; thresholds ±0.30 ≈ ±0.9σ from dept neutral

Validation:
  After scoring, commune-level NPI is compared against observed price evolution.
  Incoherent communes (high NPI + flat prices, or low NPI + rising prices) are
  logged as warnings.  Diagnostic only — not a hard constraint.

Outputs:
  data/2026/iris_pression.json    -- JSON per-IRIS scores + details
  data/immobilier.db              -- table iris_pression (raw indicators + scores)

INSEE source files (downloaded once, cached in data/insee/):
  base-ic-evol-struct-pop-2022    (table 8647014)
  base-ic-evol-struct-pop-2018    (table 5650720)  -- reference year for delta
  base-ic-logement-2022           (table 8647012)
  base-ic-activite-residents-2022 (table 8647006)
  base-ic-diplomes-formation-2022 (table 8647010)
  BASE_TD_FILO_IRIS_2021_DEC      (table 8229323)  -- Filosofi 2021 income
  DEFM2024_Iris                   (table 8644875)  -- demandeurs d'emploi 2024
  beneficiaires_CAF_31-12-2023    (table 8569601)  -- RSA/pauvrete 2023
  ds_bpe_iris_2024_geo_2024       (table 8217527)  -- equipements 2024

Usage:
    python build_pression_iris.py
    python build_pression_iris.py --force-download
"""
from __future__ import annotations

import argparse
import io
import json
import logging
import os
import urllib.request
import zipfile
from datetime import datetime, timezone

import numpy as np
import pandas as pd

from domain.core.mysql_db import (
    get_connection,
    load_amenity_weights,
    load_indicator_weights,
    load_security_scores,
)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
    datefmt="%H:%M:%S",
)
log = logging.getLogger("build_pression")

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

# v6 canonical weights — used as fallback when MySQL indicator_weights is stale.
# Keep in sync with migrate_weights_mysql.py _SEED_INDICATOR_WEIGHTS.
_DEFAULT_SPS_WEIGHTS: dict[str, float] = {
    "very_elderly_ratio": 0.22,
    "elderly_owner":      0.18,
    "vacancy_rate":       0.20,
    "pop_decline":        0.15,
    "poverty_rate":       0.15,
    "flood_risk":         0.05,
    "noise_level":        0.05,
}
_DEFAULT_BPS_WEIGHTS: dict[str, float] = {
    "income_level":      0.17,
    "first_buyer_ratio": 0.18,
    "security_score":    0.15,
    "employment_rate":   0.15,
    "price_momentum":    0.10,
    "diploma_rate":      0.09,
    "greenery_pct":      0.09,
    "amenity_score":     0.04,
    "transaction_rate":  0.03,
}

_CACHE_DIR = os.path.join("data", "insee")
_OUTPUT    = os.path.join("data", "2026", "iris_pression.json")

_INSEE_FILES: dict[str, str] = {
    # RP 2022 (primary)
    "pop22":  "https://www.insee.fr/fr/statistiques/fichier/8647014/base-ic-evol-struct-pop-2022_csv.zip",
    # RP 2018 (reference year for population delta)
    "pop18":  "https://www.insee.fr/fr/statistiques/fichier/5650720/base-ic-evol-struct-pop-2018_csv.zip",
    "log22":  "https://www.insee.fr/fr/statistiques/fichier/8647012/base-ic-logement-2022_csv.zip",
    "act22":  "https://www.insee.fr/fr/statistiques/fichier/8647006/base-ic-activite-residents-2022_csv.zip",
    "dipl22": "https://www.insee.fr/fr/statistiques/fichier/8647010/base-ic-diplomes-formation-2022_csv.zip",
    # Filosofi 2021 -- DEC file contains median income column
    "filo21": "https://www.insee.fr/fr/statistiques/fichier/8229323/BASE_TD_FILO_IRIS_2021_DEC_CSV.zip",
    # DEFM 2024 -- demandeurs d'emploi fin de mois par IRIS
    "defm24": "https://www.insee.fr/fr/statistiques/fichier/8644875/DEFM2024_Iris_csv.zip",
    # CAF 2023 -- beneficiaires RSA par IRIS
    "caf23":  "https://www.insee.fr/fr/statistiques/fichier/8569601/beneficiaires_CAF_31-12-2023_Iris.zip",
    # BPE 2024 -- equipements par IRIS
    "bpe24":  "https://www.insee.fr/fr/statistiques/fichier/8217527/ds_bpe_iris_2024_geo_2024.zip",
}

# SPS_WEIGHTS, BPS_WEIGHTS, and _AMENITY_WEIGHTS are no longer hardcoded here.
# They are loaded from MySQL at runtime via domain.core.mysql_db.
# To seed or update weights: python migrate_weights_mysql.py

# ---------------------------------------------------------------------------
# Download + cache helper
# ---------------------------------------------------------------------------

def _download_csv(key: str, url: str, force: bool) -> pd.DataFrame:
    """Download an INSEE ZIP, extract the data CSV (case-insensitive), cache it."""
    cache_path = os.path.join(_CACHE_DIR, f"{key}.csv")

    if not force and os.path.exists(cache_path):
        log.info("Cache hit: %s", cache_path)
    else:
        log.info("Downloading %s ...", url)
        os.makedirs(_CACHE_DIR, exist_ok=True)
        req = urllib.request.Request(url, headers={"User-Agent": "agent-immobilier/1.0"})
        try:
            with urllib.request.urlopen(req, timeout=180) as resp:
                raw = resp.read()
        except Exception as exc:
            log.warning("Download failed for %s: %s", key, exc)
            return pd.DataFrame()

        with zipfile.ZipFile(io.BytesIO(raw)) as zf:
            all_names = zf.namelist()
            csv_names = [n for n in all_names if n.upper().endswith(".CSV")]
            data_csvs = [
                n for n in csv_names
                if not any(kw in n.lower() for kw in ("meta", "doc", "notice", "readme"))
            ]
            if not data_csvs:
                log.warning("No data CSV in %s. Contents: %s", url.split("/")[-1], all_names)
                return pd.DataFrame()
            log.info("  Extracting %s", data_csvs[0])
            content = zf.read(data_csvs[0])

        with open(cache_path, "wb") as fout:
            fout.write(content)
        log.info("  Cached -> %s  (%d bytes)", cache_path, len(content))

    for enc in ("utf-8", "latin-1"):
        try:
            df = pd.read_csv(cache_path, sep=";", encoding=enc, dtype=str, low_memory=False)
            log.info("  Parsed %d rows x %d cols [%s]", len(df), len(df.columns), enc)
            return df
        except UnicodeDecodeError:
            continue
    log.warning("Cannot decode %s", cache_path)
    return pd.DataFrame()


def _filter_nord(df: pd.DataFrame, iris_cols: tuple[str, ...] = ("IRIS", "CODGEO", "DCIRIS")) -> pd.DataFrame:
    """Rename IRIS identifier to code_iris, filter to Nord (59)."""
    id_col = next((c for c in df.columns if c in iris_cols), None)
    if id_col is None:
        log.warning("No IRIS identifier column found. Available: %s", list(df.columns[:15]))
        return pd.DataFrame()
    df = df.rename(columns={id_col: "code_iris"})
    df["code_iris"] = df["code_iris"].astype(str).str.zfill(9)
    return df[df["code_iris"].str.startswith("59")].copy()


def _require(df: pd.DataFrame, cols: list[str]) -> bool:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        log.warning("Missing columns %s. First 40: %s", missing, list(df.columns[:40]))
    return len(missing) == 0


def _to_numeric(df: pd.DataFrame, cols: list[str]) -> None:
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)


# ---------------------------------------------------------------------------
# Individual loaders
# ---------------------------------------------------------------------------

def _load_population_2022(force: bool) -> pd.DataFrame:
    df = _download_csv("pop22", _INSEE_FILES["pop22"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    sps_cols = ["P22_POP", "P22_POP6074", "P22_POP75P"]
    bps_cols = ["P22_POP2539", "P22_POP4054", "P22_POP5564"]
    if not _require(df, sps_cols + bps_cols):
        return pd.DataFrame()
    pop = df[["code_iris"] + sps_cols + bps_cols].rename(columns={
        "P22_POP":     "pop22",
        "P22_POP6074": "pop6074",
        "P22_POP75P":  "pop75p",
        "P22_POP2539": "pop2539",
        "P22_POP4054": "pop4054",
        "P22_POP5564": "pop5564",
    })
    _to_numeric(pop, list(pop.columns[1:]))
    pop = pop[pop["pop22"] > 0].reset_index(drop=True)
    log.info("Population RP2022 (Nord, pop>0): %d IRIS", len(pop))
    return pop


def _load_population_2018(force: bool) -> pd.DataFrame:
    df = _download_csv("pop18", _INSEE_FILES["pop18"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    if not _require(df, ["P18_POP"]):
        return pd.DataFrame()
    pop = df[["code_iris", "P18_POP"]].rename(columns={"P18_POP": "pop18"})
    pop["pop18"] = pd.to_numeric(pop["pop18"], errors="coerce").fillna(0.0)
    log.info("Population RP2018 (Nord): %d IRIS", len(pop))
    return pop


def _load_logements(force: bool) -> pd.DataFrame:
    df = _download_csv("log22", _INSEE_FILES["log22"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    needed = ["P22_LOG", "P22_RP", "P22_LOGVAC", "P22_RP_PROP", "P22_RP_LOC"]
    if not _require(df, needed):
        return pd.DataFrame()
    log_df = df[["code_iris"] + needed].rename(columns={
        "P22_LOG":     "total_log",
        "P22_RP":      "rp",
        "P22_LOGVAC":  "vacants",
        "P22_RP_PROP": "prop",
        "P22_RP_LOC":  "rp_loc",
    })
    _to_numeric(log_df, list(log_df.columns[1:]))
    log_df = log_df[log_df["total_log"] > 0].reset_index(drop=True)
    log.info("Logements RP2022 (Nord, total_log>0): %d IRIS", len(log_df))
    return log_df


def _load_activite(force: bool) -> pd.DataFrame:
    df = _download_csv("act22", _INSEE_FILES["act22"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    needed = ["P22_ACTOCC1564", "P22_ACT1564"]
    if not _require(df, needed):
        return pd.DataFrame()
    act = df[["code_iris"] + needed].rename(columns={
        "P22_ACTOCC1564": "actocc",
        "P22_ACT1564":    "act",
    })
    _to_numeric(act, ["actocc", "act"])
    log.info("Activite RP2022 (Nord): %d IRIS", len(act))
    return act


def _load_diplomes(force: bool) -> pd.DataFrame:
    df = _download_csv("dipl22", _INSEE_FILES["dipl22"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    needed = ["P22_NSCOL15P", "P22_NSCOL15P_SUP2", "P22_NSCOL15P_SUP34", "P22_NSCOL15P_SUP5"]
    if not _require(df, needed):
        return pd.DataFrame()
    dipl = df[["code_iris"] + needed].rename(columns={
        "P22_NSCOL15P":       "nscol15p",
        "P22_NSCOL15P_SUP2":  "sup2",
        "P22_NSCOL15P_SUP34": "sup34",
        "P22_NSCOL15P_SUP5":  "sup5",
    })
    _to_numeric(dipl, list(dipl.columns[1:]))
    log.info("Diplomes RP2022 (Nord): %d IRIS", len(dipl))
    return dipl


def _load_filosofi(force: bool) -> pd.DataFrame:
    """
    Filosofi 2021 -- median disposable income per IRIS.
    Tries several candidate column names for the median.
    Covers communes >= 5000 hab; missing IRIS filled with Nord median later.
    """
    df = _download_csv("filo21", _INSEE_FILES["filo21"], force)
    if df.empty:
        return pd.DataFrame()

    # Filosofi uses IRIS column directly
    id_col = next((c for c in df.columns if c in ("IRIS", "CODGEO")), None)
    if id_col is None:
        log.warning("No IRIS column in Filosofi file. Columns: %s", list(df.columns[:15]))
        return pd.DataFrame()
    df = df.rename(columns={id_col: "code_iris"})
    df["code_iris"] = df["code_iris"].astype(str).str.zfill(9)
    df = df[df["code_iris"].str.startswith("59")].copy()

    # Probe for median income column (varies by Filosofi vintage/type)
    candidates = ["DISP_MED21", "MED21", "Q221", "DISP_MED20", "RFMED"]
    med_col = next((c for c in candidates if c in df.columns), None)
    if med_col is None:
        # Last resort: look for any column with MED in name
        med_col = next((c for c in df.columns if "MED" in c.upper()), None)
    if med_col is None:
        log.warning("Cannot find median income column in Filosofi. Columns: %s", list(df.columns[:20]))
        return pd.DataFrame()

    log.info("  Using Filosofi median column: %s", med_col)
    filo = df[["code_iris", med_col]].rename(columns={med_col: "income_median"})
    filo["income_median"] = pd.to_numeric(filo["income_median"], errors="coerce")
    n_valid = filo["income_median"].notna().sum()
    log.info(
        "Filosofi 2021 (Nord): %d IRIS with income data (median=%.0f EUR)",
        n_valid, filo["income_median"].median() if n_valid > 0 else 0,
    )
    return filo


def _load_defm(force: bool) -> pd.DataFrame:
    """
    DEFM 2024 -- demandeurs d'emploi fin de mois par IRIS.
    Returns {code_iris, defm_count}.  Empty DF if download fails.
    """
    df = _download_csv("defm24", _INSEE_FILES["defm24"], force)
    if df.empty:
        return pd.DataFrame()
    df = _filter_nord(df)
    if df.empty:
        return pd.DataFrame()

    # Probe for the count column (INSEE DEFM files vary)
    candidates = ["DEFM", "NB_DEFM", "A", "CAT_ABC", "CATEG_ABC", "NBDEFM"]
    count_col = next((c for c in candidates if c in df.columns), None)
    if count_col is None:
        # Pick first numeric-looking column that isn't code_iris
        for c in df.columns:
            if c == "code_iris":
                continue
            vals = pd.to_numeric(df[c], errors="coerce")
            if vals.notna().mean() > 0.5:
                count_col = c
                break
    if count_col is None:
        log.warning("Cannot identify DEFM count column. Columns: %s", list(df.columns[:20]))
        return pd.DataFrame()

    log.info("  Using DEFM count column: %s", count_col)
    defm = df[["code_iris", count_col]].rename(columns={count_col: "defm_count"})
    defm["defm_count"] = pd.to_numeric(defm["defm_count"], errors="coerce").fillna(0.0)

    # Sum in case there are multiple rows per IRIS (e.g. by category)
    defm = defm.groupby("code_iris", as_index=False)["defm_count"].sum()
    log.info("DEFM 2024 (Nord): %d IRIS with job seeker data", len(defm))
    return defm


def _load_caf(force: bool) -> pd.DataFrame:
    """
    CAF 2023 -- beneficiaires RSA par commune (CODGEO = 5-digit).
    Returns {code_commune (5-digit str), poverty_rate}.
    The caller joins via code_iris[:5].
    poverty_rate = ARSAS / PERCOU.
    """
    df = _download_csv("caf23", _INSEE_FILES["caf23"], force)
    if df.empty:
        return pd.DataFrame(columns=["code_commune", "poverty_rate"])

    # CAF uses 5-digit commune CODGEO (not 9-digit IRIS)
    id_col = next((c for c in df.columns if c in ("CODGEO", "IRIS", "COM")), None)
    if id_col is None:
        log.warning("No commune code column in CAF file. Available: %s", list(df.columns[:15]))
        return pd.DataFrame(columns=["code_commune", "poverty_rate"])
    df["code_commune"] = df[id_col].astype(str).str.zfill(5)
    df = df[df["code_commune"].str.startswith("59")].copy()

    rsa_candidates = ["ARSAS", "NB_RSA", "RSA", "BRSAA"]
    cov_candidates = ["PERCOU", "POP_COV", "NBPERCOU", "NBPERSCOUV", "A"]
    rsa_col = next((c for c in rsa_candidates if c in df.columns), None)
    cov_col = next((c for c in cov_candidates if c in df.columns), None)

    if rsa_col is None or cov_col is None:
        log.warning(
            "Cannot find RSA/coverage columns in CAF. rsa=%s cov=%s  Available: %s",
            rsa_col, cov_col, list(df.columns[:30]),
        )
        return pd.DataFrame(columns=["code_commune", "poverty_rate"])

    log.info("  Using CAF columns: rsa=%s  cov=%s", rsa_col, cov_col)
    caf = df[["code_commune", rsa_col, cov_col]].copy()
    caf[rsa_col] = pd.to_numeric(caf[rsa_col], errors="coerce").fillna(0.0)
    caf[cov_col] = pd.to_numeric(caf[cov_col], errors="coerce").fillna(0.0)
    caf["poverty_rate"] = np.where(
        caf[cov_col] > 0, caf[rsa_col] / caf[cov_col], 0.0
    )
    result = caf[["code_commune", "poverty_rate"]].drop_duplicates("code_commune")
    log.info(
        "CAF 2023 (Nord): %d communes  (poverty_rate mean=%.3f)",
        len(result), result["poverty_rate"].mean(),
    )
    return result


def _load_bpe(force: bool, amenity_weights: dict[str, float]) -> pd.DataFrame:
    """
    BPE 2024 -- equipements par IRIS.
    Returns {code_iris, amenity_score} where amenity_score is a weighted
    count of key infrastructure types, normalised later with _minmax().
    Handles both classic (DCIRIS/TYPEQU/NB_EQUIP) and SDMX (GEO/FACILITY_TYPE/OBS_VALUE) formats.
    """
    df = _download_csv("bpe24", _INSEE_FILES["bpe24"], force)
    if df.empty:
        return pd.DataFrame(columns=["code_iris", "amenity_score"])

    # IRIS identifier — try both classic and SDMX column names
    id_col = next(
        (c for c in df.columns if c in ("DCIRIS", "IRIS", "GEO", "CODGEO")), None
    )
    if id_col is None:
        log.warning("No IRIS column in BPE file. Available: %s", list(df.columns[:15]))
        return pd.DataFrame(columns=["code_iris", "amenity_score"])
    df = df.rename(columns={id_col: "code_iris"})
    df["code_iris"] = df["code_iris"].astype(str).str.zfill(9)
    df = df[df["code_iris"].str.startswith("59")].copy()

    # Equipment type column
    typequ_col = next(
        (c for c in df.columns if c in ("TYPEQU", "FACILITY_TYPE", "TYPE_EQUIP", "TYPEEQUIP")),
        None,
    )
    if typequ_col is None:
        log.warning("No TYPEQU column in BPE file. Available: %s", list(df.columns[:15]))
        return pd.DataFrame(columns=["code_iris", "amenity_score"])

    # Count column
    nb_col = next(
        (c for c in df.columns if c in ("NB_EQUIP", "OBS_VALUE", "NB", "NBEQUIP")), None
    )

    # Filter to GEO_OBJECT==IRIS rows if column present (SDMX format has commune rows too)
    if "GEO_OBJECT" in df.columns:
        df = df[df["GEO_OBJECT"].str.upper() == "IRIS"].copy()

    def _weight(typequ: str) -> float:
        prefix = str(typequ)[:2]
        return amenity_weights.get(prefix, 0.0)

    df["_weight"] = df[typequ_col].apply(_weight)
    df = df[df["_weight"] > 0].copy()

    if nb_col and nb_col in df.columns:
        df[nb_col] = pd.to_numeric(df[nb_col], errors="coerce").fillna(1.0)
        df["_contrib"] = df["_weight"] * df[nb_col]
    else:
        df["_contrib"] = df["_weight"]

    amenity = df.groupby("code_iris")["_contrib"].sum().reset_index()
    amenity.columns = ["code_iris", "amenity_score"]
    log.info(
        "BPE 2024 (Nord): %d IRIS with amenity data  (mean=%.1f)",
        len(amenity), amenity["amenity_score"].mean(),
    )
    return amenity


def _load_dvf_transaction_rate() -> pd.DataFrame:
    """
    DVF market activity level: average annual transaction count per IRIS.
    Uses prix_evolution_iris table already in MySQL.
    Returns {code_iris, transaction_rate} where transaction_rate >= 0.

    A higher value = more liquid market = stronger latent demand signal.
    Replaces construction_rate (volume growth 2021->latest), which is noisy
    for small IRIS and conflates one-off years with structural demand.
    """
    try:
        conn = get_connection()
    except RuntimeError as exc:
        log.warning("MySQL unavailable — transaction_rate will default to 0: %s", exc)
        return pd.DataFrame()

    try:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT code_iris, annee, nb_transactions "
                "FROM prix_evolution_iris "
                "WHERE code_iris LIKE %s",
                ("59%",),
            )
            rows = cur.fetchall()
    except Exception as exc:
        log.warning("Could not query prix_evolution_iris: %s", exc)
        return pd.DataFrame()
    finally:
        conn.close()

    if not rows:
        return pd.DataFrame()

    df = pd.DataFrame(rows)

    result = (
        df.groupby("code_iris")["nb_transactions"]
        .mean()
        .reset_index()
        .rename(columns={"nb_transactions": "transaction_rate"})
    )
    log.info(
        "DVF transaction rate (Nord, %d years): %d IRIS  (mean=%.1f tx/yr)",
        df["annee"].nunique(), len(result), result["transaction_rate"].mean(),
    )
    return result


def _load_nature_greenery() -> pd.DataFrame:
    """
    Load per-IRIS nature coverage from iris_nature.json built by build_nature_zones.py.
    Returns {code_iris, greenery_pct} where greenery_pct ∈ [0, 1].
    IRIS not in the file (outside the query bbox) get 0 by default.
    """
    path = os.path.join("data", "2026", "iris_nature.json")
    if not os.path.exists(path):
        log.warning(
            "iris_nature.json not found at %s — run build_nature_zones.py first. "
            "greenery_pct will default to 0 for all IRIS.",
            path,
        )
        return pd.DataFrame(columns=["code_iris", "greenery_pct"])

    with open(path, encoding="utf-8") as f:
        raw = json.load(f)

    rows = [
        {"code_iris": str(code).zfill(9), "greenery_pct": float(v["nature_pct"]) / 100.0}
        for code, v in raw.items()
    ]
    df = pd.DataFrame(rows)
    covered = (df["greenery_pct"] > 0).sum()
    log.info(
        "Nature greenery (Nord): %d IRIS | %d with coverage > 0 (mean=%.1f%%)",
        len(df), covered, df["greenery_pct"].mean() * 100,
    )
    return df


def _load_security() -> pd.DataFrame:
    """
    Load per-IRIS security scores from iris_securite.json (Lille métropole only).
    Inverts danger_score (1–10) to security_score = 10 - danger_score.
    IRIS not covered here are filled by _load_security_commune_fallback() in
    _compute_scores() before falling back to the Nord mean.
    """
    path = os.path.join("data", "2026", "iris_securite.json")
    if not os.path.exists(path):
        log.warning("iris_securite.json not found — run build_iris_data.py. Falling back to commune level.")
        return pd.DataFrame(columns=["code_iris", "security_score"])

    with open(path, encoding="utf-8") as f:
        raw = json.load(f)

    rows = []
    for code, v in raw.items():
        danger = float(v) if isinstance(v, (int, float)) else float(v.get("danger_score", 5))
        rows.append({"code_iris": str(code).zfill(9), "security_score": 10.0 - danger})

    df = pd.DataFrame(rows)
    df = df[df["code_iris"].str.startswith("59")].copy()
    log.info(
        "iris_securite.json: %d IRIS (mean security_score=%.2f)",
        len(df), df["security_score"].mean(),
    )
    return df


def _load_security_commune_fallback() -> dict[str, float]:
    """
    Load commune-level danger scores from MySQL security_nord table.
    Returns {code_commune_5_digits: security_score} (= 10 - danger_score).
    Used in _compute_scores() to cover the ~1,000 Nord IRIS outside Lille
    métropole that iris_securite.json does not include.
    """
    try:
        scores = load_security_scores()
    except RuntimeError as exc:
        log.warning(
            "security_nord table unavailable — IRIS without iris_securite.json "
            "coverage will use Nord mean. Run: python build_securite_nord.py. (%s)",
            exc,
        )
        return {}
    result = {code: 10.0 - danger for code, danger in scores.items()}
    log.info("security_nord (MySQL): %d communes loaded as security fallback", len(result))
    return result


# ---------------------------------------------------------------------------
# New v6 loaders: centroids, flood risk, noise level, price momentum
# ---------------------------------------------------------------------------

def _load_iris_centroids() -> dict[str, tuple[float, float]]:
    """Load IRIS centroids from iris_nord.geojson. Returns {code_iris: (lon, lat)}."""
    path = os.path.join("data", "2026", "iris_nord.geojson")
    if not os.path.exists(path):
        log.warning("iris_nord.geojson not found — flood_risk and noise_level will be 0.")
        return {}

    from shapely.geometry import shape as _shape

    with open(path, encoding="utf-8") as f:
        gj = json.load(f)

    centroids: dict[str, tuple[float, float]] = {}
    for feat in gj["features"]:
        code = str(feat["properties"].get("code_iris", "")).zfill(9)
        if not code.startswith("59"):
            continue
        try:
            c = _shape(feat["geometry"]).centroid
            centroids[code] = (c.x, c.y)
        except Exception:
            pass

    log.info("IRIS centroids loaded from iris_nord.geojson: %d", len(centroids))
    return centroids


def _load_flood_risk(centroids: dict[str, tuple[float, float]]) -> pd.DataFrame:
    """
    Point-in-polygon flood risk per IRIS centroid using SQLite flood_zones.
    Returns {code_iris, flood_risk} ∈ [0, 1]:
      frequent → 1.0 (highest buyer deterrence)
      moyen    → 0.6
      rare     → 0.2
      none     → 0.0
    Uses shapely STRtree for efficient lookup.
    """
    if not centroids:
        return pd.DataFrame(columns=["code_iris", "flood_risk"])

    from shapely.geometry import Point, Polygon, MultiPolygon
    from shapely import STRtree

    SCENARIO_SCORES = {"frequent": 1.0, "moyen": 0.6, "rare": 0.2}

    # Nord bbox pre-filter (lng 2.0–3.9, lat 50.0–51.1)
    conn = get_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT scenario, geom_type, coordinates, "
                "       bbox_min_lng, bbox_min_lat, bbox_max_lng, bbox_max_lat "
                "FROM flood_zones "
                "WHERE bbox_max_lng >= 2.0 AND bbox_min_lng <= 3.9 "
                "  AND bbox_max_lat >= 50.0 AND bbox_min_lat <= 51.1"
            )
            rows = cur.fetchall()
    finally:
        conn.close()

    geom_list: list = []
    score_list: list[float] = []
    for row in rows:
        scenario   = row["scenario"]
        geom_type  = row["geom_type"]
        coords_json = row["coordinates"]
        score = SCENARIO_SCORES.get(scenario, 0.0)
        try:
            coords = json.loads(coords_json)
            if geom_type == "Polygon":
                geom = Polygon(coords[0])
            elif geom_type == "MultiPolygon":
                geom = MultiPolygon([Polygon(ring[0]) for ring in coords])
            else:
                continue
            if geom.is_valid:
                geom_list.append(geom)
                score_list.append(score)
        except Exception:
            pass

    log.info("Flood zones (Nord bbox): %d valid polygons for IRIS matching", len(geom_list))

    tree = STRtree(geom_list)
    result = []
    for code, (lon, lat) in centroids.items():
        pt = Point(lon, lat)
        best = 0.0
        for idx in tree.query(pt):
            if geom_list[idx].contains(pt):
                best = max(best, score_list[idx])
                if best == 1.0:
                    break
        result.append({"code_iris": code, "flood_risk": best})

    df = pd.DataFrame(result)
    n_risk = int((df["flood_risk"] > 0).sum())
    log.info(
        "Flood risk per IRIS: %d total, %d at risk (%.1f%%)",
        len(df), n_risk, 100.0 * n_risk / max(len(df), 1),
    )
    return df


def _load_noise_level(centroids: dict[str, tuple[float, float]]) -> pd.DataFrame:
    """
    Per-IRIS noise exposure score from proximity to OSM road/rail segments.
    Returns {code_iris, noise_level} ∈ [0, 1] where 1 = strong exposure
    (motorway adjacent). Uses 500 m buffer around each IRIS centroid.
    """
    if not centroids:
        return pd.DataFrame(columns=["code_iris", "noise_level"])

    from shapely.geometry import Point, LineString
    from shapely import STRtree

    conn = get_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT lden_min, geom_type, coordinates "
                "FROM noise_zones"
            )
            rows = cur.fetchall()
    finally:
        conn.close()

    line_geoms: list = []
    lden_list: list[int] = []
    for row in rows:
        lden       = row["lden_min"]
        geom_type  = row["geom_type"]
        coords_json = row["coordinates"]
        if geom_type != "LineString":
            continue
        try:
            coords = json.loads(coords_json)
            if len(coords) >= 2:
                line_geoms.append(LineString(coords))
                lden_list.append(int(lden))
        except Exception:
            pass

    log.info("Noise segments loaded: %d LineStrings", len(line_geoms))

    tree = STRtree(line_geoms)
    # 500 m ≈ 0.0045° lat at 50.5°N; use 0.006° to include diagonal paths
    BUFFER_DEG = 0.006
    MAX_DIST_M = 500.0

    def _lden_weight(lden: int) -> float:
        # Map 55–75 dB → 0.20–1.0 linearly
        return max(0.0, min(1.0, (lden - 55.0) / 20.0)) * 0.8 + 0.2

    result = []
    for code, (lon, lat) in centroids.items():
        pt = Point(lon, lat)
        buf = pt.buffer(BUFFER_DEG)
        best = 0.0
        for idx in tree.query(buf):
            dist_deg = pt.distance(line_geoms[idx])
            dist_m = dist_deg * 111_000.0
            if dist_m > MAX_DIST_M:
                continue
            proximity = max(0.0, 1.0 - dist_m / MAX_DIST_M)
            exposure = _lden_weight(lden_list[idx]) * proximity
            if exposure > best:
                best = exposure
        result.append({"code_iris": code, "noise_level": best})

    df = pd.DataFrame(result)
    log.info(
        "Noise level per IRIS: mean=%.3f  p90=%.3f",
        df["noise_level"].mean(), df["noise_level"].quantile(0.9),
    )
    return df


def _load_price_momentum() -> pd.DataFrame:
    """
    Per-IRIS price momentum: CAGR of prix_m2_median over the available DVF years.
    Returns {code_iris, price_momentum} where price_momentum is the annual growth
    rate (e.g. 0.04 = +4 %/yr).  Capped at ±15 % to handle outliers from sparse data.
    IRIS with < 2 distinct years of data get NaN (filled with Nord median in _compute_scores).
    """
    try:
        conn = get_connection()
    except RuntimeError as exc:
        log.warning("MySQL unavailable — price_momentum will default to 0: %s", exc)
        return pd.DataFrame()

    try:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT code_iris, annee, prix_m2_median "
                "FROM prix_evolution_iris "
                "WHERE code_iris LIKE %s AND prix_m2_median IS NOT NULL",
                ("59%",),
            )
            rows = cur.fetchall()
    except Exception as exc:
        log.warning("Could not query prix_evolution_iris for momentum: %s", exc)
        return pd.DataFrame()
    finally:
        conn.close()

    if not rows:
        return pd.DataFrame()

    df = pd.DataFrame(rows)
    df["annee"] = pd.to_numeric(df["annee"], errors="coerce")
    df["prix_m2_median"] = pd.to_numeric(df["prix_m2_median"], errors="coerce")
    df = df.dropna()

    def _cagr(sub: pd.DataFrame) -> float:
        sub = sub.sort_values("annee")
        n_years = sub["annee"].iloc[-1] - sub["annee"].iloc[0]
        if n_years < 1:
            return float("nan")
        p_old = float(sub["prix_m2_median"].iloc[0])
        p_new = float(sub["prix_m2_median"].iloc[-1])
        if p_old <= 0:
            return float("nan")
        return (p_new / p_old) ** (1.0 / n_years) - 1.0

    result = df.groupby("code_iris").apply(_cagr)
    result.name = "price_momentum"
    result = result.reset_index()
    result.columns = ["code_iris", "price_momentum"]
    result["price_momentum"] = result["price_momentum"].clip(-0.15, 0.15)

    n_valid = int(result["price_momentum"].notna().sum())
    log.info(
        "Price momentum (Nord DVF): %d IRIS  (median CAGR=%.2f%%/yr)",
        n_valid,
        result["price_momentum"].median() * 100 if n_valid > 0 else 0.0,
    )
    return result


def _validate_price_consistency(df: pd.DataFrame) -> None:
    """
    Log communes where NPI and observed price evolution are incoherent:
      high NPI  + flat  prices  → structural demand not yet manifest
      low  NPI  + rising prices → supply catching up or data lag
    Diagnostic only — does not modify df.
    """
    try:
        conn = get_connection()
        with conn.cursor() as cur:
            cur.execute(
                "SELECT code_iris, evolution_m2_pct "
                "FROM prix_evolution_iris "
                "WHERE code_iris LIKE %s AND annee >= 2022 "
                "  AND evolution_m2_pct IS NOT NULL",
                ("59%",),
            )
            evo_rows = cur.fetchall()
        conn.close()
    except Exception as exc:
        log.warning("Price consistency check skipped: %s", exc)
        return

    if not evo_rows:
        return

    evo_df = pd.DataFrame(evo_rows)
    evo_df["evolution_m2_pct"] = pd.to_numeric(evo_df["evolution_m2_pct"], errors="coerce")
    iris_evo = (
        evo_df.groupby("code_iris")["evolution_m2_pct"]
        .mean()
        .reset_index()
        .rename(columns={"evolution_m2_pct": "mean_evo_pct"})
    )

    check = df[["code_iris", "pop22", "npi"]].merge(iris_evo, on="code_iris", how="inner")
    check["code_commune"] = check["code_iris"].str[:5]

    def _pw(sub: pd.DataFrame) -> pd.Series:
        total_pop = sub["pop22"].sum()
        if total_pop == 0:
            return pd.Series({"commune_npi": 0.0, "commune_evo": 0.0})
        w = sub["pop22"] / total_pop
        return pd.Series({
            "commune_npi": float((sub["npi"] * w).sum()),
            "commune_evo": float((sub["mean_evo_pct"] * w).sum()),
        })

    communes = check.groupby("code_commune").apply(_pw).reset_index()

    incoherent = communes[
        ((communes["commune_npi"] > 0.25) & (communes["commune_evo"] < 1.0)) |
        ((communes["commune_npi"] < -0.25) & (communes["commune_evo"] > 5.0))
    ]
    if incoherent.empty:
        log.info("Price/NPS consistency check: OK (no incoherent communes)")
    else:
        log.warning(
            "Price/NPS incoherence in %d communes (high NPI + flat prices or vice versa):",
            len(incoherent),
        )
        for _, row in incoherent.iterrows():
            log.warning(
                "  commune %s: npi=%.2f  price_evo=%.1f%%/yr",
                row["code_commune"], row["commune_npi"], row["commune_evo"],
            )


# ---------------------------------------------------------------------------
# Normalisation
# ---------------------------------------------------------------------------

def _minmax(series: pd.Series) -> pd.Series:
    lo, hi = series.min(), series.max()
    if hi == lo:
        return pd.Series(0.5, index=series.index, dtype=float)
    return (series - lo) / (hi - lo)


# ---------------------------------------------------------------------------
# Score computation
# ---------------------------------------------------------------------------

def _safe_merge(
    left: pd.DataFrame,
    right: pd.DataFrame,
    on: str,
    how: str = "left",
) -> pd.DataFrame:
    """Merge but skip silently if right is empty or missing the key column."""
    if right.empty or on not in right.columns:
        return left
    return left.merge(right, on=on, how=how)


def _compute_scores(
    pop22:        pd.DataFrame,
    pop18:        pd.DataFrame,
    log_df:       pd.DataFrame,
    act_df:       pd.DataFrame,
    dipl:         pd.DataFrame,
    filo:         pd.DataFrame,
    defm:         pd.DataFrame,
    caf:          pd.DataFrame,      # {code_commune, poverty_rate} — commune level
    amenity:      pd.DataFrame,
    dvf_data:     pd.DataFrame,
    greenery_df:  pd.DataFrame,      # {code_iris, greenery_pct ∈ [0,1]}
    security_df:  pd.DataFrame,      # {code_iris, security_score = 10 - danger_score}
    flood_df:     pd.DataFrame,      # {code_iris, flood_risk ∈ [0,1]}
    noise_df:     pd.DataFrame,      # {code_iris, noise_level ∈ [0,1]}
    momentum_df:  pd.DataFrame,      # {code_iris, price_momentum (CAGR)}
    sps_weights:  dict[str, float],  # loaded from MySQL
    bps_weights:  dict[str, float],  # loaded from MySQL
) -> pd.DataFrame:
    """
    Merge all sources, compute raw indicators, normalise, score.
    Returns a DataFrame with all raw + normalised columns + SPS/BPS/NPI.
    """
    df = pop22.merge(pop18,  on="code_iris", how="left")
    df = df.merge(log_df,    on="code_iris", how="inner")
    df = _safe_merge(df, act_df,      on="code_iris")
    df = _safe_merge(df, dipl,        on="code_iris")
    df = _safe_merge(df, filo,        on="code_iris")
    df = _safe_merge(df, defm,        on="code_iris")
    df = _safe_merge(df, amenity,     on="code_iris")
    df = _safe_merge(df, dvf_data,    on="code_iris")
    df = _safe_merge(df, greenery_df, on="code_iris")
    df = _safe_merge(df, security_df, on="code_iris")
    df = _safe_merge(df, flood_df,    on="code_iris")
    df = _safe_merge(df, noise_df,    on="code_iris")
    df = _safe_merge(df, momentum_df, on="code_iris")

    # CAF is at commune level — join via first 5 digits of code_iris
    if not caf.empty and "code_commune" in caf.columns:
        df["code_commune"] = df["code_iris"].str[:5]
        df = df.merge(caf, on="code_commune", how="left")
        df = df.drop(columns=["code_commune"])

    # Fill missing reference data with neutral values
    # Fill optional columns that may be absent when their source was unavailable
    for col, default in [
        ("pop18",            0.0),
        ("actocc",           0.0),
        ("act",              0.0),
        ("nscol15p",         0.0),
        ("sup2",             0.0),
        ("sup34",            0.0),
        ("sup5",             0.0),
        ("defm_count",       0.0),
        ("amenity_score",    0.0),
        ("transaction_rate", 0.0),
        ("greenery_pct",     0.0),
        ("flood_risk",       0.0),   # no flood zone = no risk
        ("noise_level",      0.0),   # no nearby noise source
        ("poverty_rate",     float("nan")),   # NaN so Nord mean fill-in triggers
        ("security_score",   float("nan")),   # NaN so Nord mean fill-in triggers
        ("price_momentum",   float("nan")),   # NaN so Nord median fill-in triggers
    ]:
        if col not in df.columns:
            df[col] = default
        else:
            df[col] = df[col].fillna(default)

    # Filosofi: fill missing with Nord median (IRIS < 5000 hab not covered)
    if "income_median" not in df.columns:
        df["income_median"] = float("nan")
    nord_income_median = df["income_median"].median()
    if pd.isna(nord_income_median):
        nord_income_median = 19750.0  # Nord historical median fallback
    missing_filo = df["income_median"].isna().sum()
    df["income_level"] = df["income_median"].fillna(nord_income_median)
    log.info(
        "Merged: %d IRIS | %d without Filosofi -> filled with Nord median (%.0f EUR)",
        len(df), missing_filo, nord_income_median,
    )

    # CAF poverty_rate: fill missing with Nord mean (not available for small communes)
    nord_pov_mean = df["poverty_rate"].mean()
    if pd.isna(nord_pov_mean):
        nord_pov_mean = 0.08
    missing_caf = df["poverty_rate"].isna().sum()
    df["poverty_rate"] = df["poverty_rate"].fillna(nord_pov_mean)
    log.info(
        "%d IRIS without CAF poverty data -> filled with Nord mean (%.3f)",
        missing_caf, nord_pov_mean,
    )

    # security_score: two-stage fill.
    # Stage 1 — commune-level fallback: covers the ~1,000 Nord IRIS outside Lille
    #   métropole that iris_securite.json does not include.
    # Stage 2 — Nord mean: last resort for any IRIS whose commune is also missing.
    missing_before = int(df["security_score"].isna().sum())
    if missing_before > 0:
        commune_sec = _load_security_commune_fallback()
        if commune_sec:
            mask = df["security_score"].isna()
            df.loc[mask, "security_score"] = df.loc[mask, "code_iris"].str[:5].map(commune_sec)
            filled_by_commune = missing_before - int(df["security_score"].isna().sum())
            log.info("Security commune fallback: filled %d / %d uncovered IRIS", filled_by_commune, missing_before)

    nord_sec_mean = df["security_score"].mean()
    if pd.isna(nord_sec_mean):
        nord_sec_mean = 5.0
    still_missing = int(df["security_score"].isna().sum())
    df["security_score"] = df["security_score"].fillna(nord_sec_mean)
    if still_missing:
        log.warning("%d IRIS still without security data after commune fallback -> Nord mean (%.2f)", still_missing, nord_sec_mean)

    # price_momentum: fill missing with Nord median (≈0 for flat-market IRIS)
    nord_momentum_median = float(df["price_momentum"].median())
    if np.isnan(nord_momentum_median):
        nord_momentum_median = 0.0
    missing_momentum = int(df["price_momentum"].isna().sum())
    df["price_momentum"] = df["price_momentum"].fillna(nord_momentum_median)
    log.info(
        "%d IRIS without price momentum -> filled with Nord median (%.2f%%/yr)",
        missing_momentum, nord_momentum_median * 100,
    )

    # ------------------------------------------------------------------
    # SPS raw indicators
    # ------------------------------------------------------------------

    df["ownership_rate"] = np.where(df["rp"] > 0, df["prop"] / df["rp"], 0.0)

    # very_elderly_ratio: pop 75+ / pop total — near-term sellers (EHPAD/inheritance)
    df["very_elderly_ratio"] = np.where(df["pop22"] > 0, df["pop75p"] / df["pop22"], 0.0)

    # elderly_owner: (pop75+ / pop) × ownership_rate — actual supply risk
    # Only IRIS with both high elderly AND high ownership get elevated SPS.
    df["elderly_owner"] = df["very_elderly_ratio"] * df["ownership_rate"]

    df["vacancy_rate"] = np.where(
        df["total_log"] > 0, df["vacants"] / df["total_log"], 0.0
    )

    # pop_decline: RP2018 → RP2022 (4-year delta), exponential amplification.
    # (1 + rate)^3 - 1 so that a 5 % decline → ~15.8 % signal (vs 5 % linear).
    _raw_decline = np.where(
        df["pop18"] > 0,
        np.maximum(0.0, (df["pop18"] - df["pop22"]) / df["pop18"]),
        0.0,
    )
    df["pop_decline"] = np.power(1.0 + _raw_decline, 3.0) - 1.0

    # poverty_rate, flood_risk, noise_level already merged and filled above

    # ------------------------------------------------------------------
    # BPS raw indicators
    # ------------------------------------------------------------------

    # first_buyer_ratio: pop 25-44 / pop total — peak French homebuyer demographic
    # pop2539 = 25-39 (full 15-year bin); pop 40-44 ≈ pop4054 × (5/15) = pop4054/3
    pop_2544 = df["pop2539"] + df["pop4054"] / 3.0
    df["first_buyer_ratio"] = np.where(df["pop22"] > 0, pop_2544 / df["pop22"], 0.0)

    # income_level already filled

    # pop_growth: exponential amplification matching pop_decline (symmetric treatment).
    # (1 + rate)^3 - 1: 1 % growth → ~3 % signal, 5 % → ~15.8 %, 10 % → ~33 %.
    _raw_growth = np.where(
        df["pop18"] > 0,
        np.maximum(0.0, (df["pop22"] - df["pop18"]) / df["pop18"]),
        0.0,
    )
    df["pop_growth"] = np.power(1.0 + _raw_growth, 3.0) - 1.0

    # employment_rate: use DEFM to improve RP employment estimate where possible
    df["employment_rate"] = np.where(
        (df["act"] > 0) & (df["defm_count"] == 0),
        df["actocc"] / df["act"],
        np.where(
            (df["actocc"] + df["defm_count"]) > 0,
            df["actocc"] / (df["actocc"] + df["defm_count"]),
            0.0,
        ),
    )

    df["diploma_rate"] = np.where(
        df["nscol15p"] > 0,
        (df["sup2"] + df["sup34"] + df["sup5"]) / df["nscol15p"],
        0.0,
    )

    # amenity_score, transaction_rate, greenery_pct, security_score, price_momentum
    # already merged and filled above

    sps_inds = list(sps_weights.keys())
    bps_inds = list(bps_weights.keys())
    all_inds = sps_inds + bps_inds

    log.info("Raw indicator statistics (Nord IRIS):")
    for label, inds in [("SPS", sps_inds), ("BPS", bps_inds)]:
        log.info("  -- %s indicators --", label)
        for ind in inds:
            s = df[ind]
            log.info(
                "  %-22s  mean=%6.3f  p10=%6.3f  p90=%6.3f  max=%7.3f",
                ind, s.mean(), s.quantile(0.10), s.quantile(0.90), s.max(),
            )

    # ------------------------------------------------------------------
    # Normalise
    # ------------------------------------------------------------------
    for ind in all_inds:
        df[f"{ind}_norm"] = _minmax(df[ind])

    # ------------------------------------------------------------------
    # Weighted scores
    # ------------------------------------------------------------------
    df["sps"] = sum(sps_weights[i] * df[f"{i}_norm"] for i in sps_inds)
    df["bps"] = sum(bps_weights[i] * df[f"{i}_norm"] for i in bps_inds)

    # NPI: population-weighted centering so department mean = 0 by construction.
    # npi_raw = bps - sps  (both ∈ [0, 1])
    # dept_mean subtracted → centered; divided by 3σ → ±3σ maps to ±1.0.
    # Thresholds ±0.30 ≈ ±0.9σ from the department's own neutral point.
    npi_raw = df["bps"] - df["sps"]
    df["npi_raw"] = npi_raw

    pop_w = df["pop22"] / max(float(df["pop22"].sum()), 1.0)
    dept_mean = float((npi_raw * pop_w).sum())
    npi_centered = npi_raw - dept_mean
    dept_std = float(np.sqrt(((npi_centered ** 2) * pop_w).sum()))
    if dept_std < 1e-6:
        dept_std = 1.0
    df["npi"] = (npi_centered / (3.0 * dept_std)).clip(-1.0, 1.0)

    pop_weighted_mean = float((df["npi"] * pop_w).sum())
    log.info(
        "NPI -- dept_mean_removed=%.4f  dept_std=%.4f  "
        "npi range=[%.3f, %.3f]  pop-weighted mean after centering=%.6f",
        dept_mean, dept_std, df["npi"].min(), df["npi"].max(), pop_weighted_mean,
    )

    for score in ("sps", "bps", "npi"):
        s = df[score]
        log.info(
            "%s -- mean=%.3f  p10=%.3f  p90=%.3f  min=%.3f  max=%.3f",
            score.upper(), s.mean(), s.quantile(0.10), s.quantile(0.90),
            s.min(), s.max(),
        )

    _validate_price_consistency(df)
    return df


# ---------------------------------------------------------------------------
# MySQL persistence
# ---------------------------------------------------------------------------

_DDL_IRIS_PRESSION = """
CREATE TABLE IF NOT EXISTS iris_pression (
    code_iris                 VARCHAR(9) PRIMARY KEY,
    -- SPS raw indicators (v6)
    very_elderly_ratio        REAL,
    elderly_owner             REAL,
    vacancy_rate              REAL,
    pop_decline               REAL,
    poverty_rate              REAL,
    flood_risk                REAL,
    noise_level               REAL,
    -- BPS raw indicators (v6)
    income_level              REAL,
    first_buyer_ratio         REAL,
    security_score            REAL,
    employment_rate           REAL,
    price_momentum            REAL,
    diploma_rate              REAL,
    greenery_pct              REAL,
    pop_growth                REAL,
    amenity_score             REAL,
    transaction_rate          REAL,
    -- SPS normalised
    very_elderly_ratio_norm   REAL,
    elderly_owner_norm        REAL,
    vacancy_rate_norm         REAL,
    pop_decline_norm          REAL,
    poverty_rate_norm         REAL,
    flood_risk_norm           REAL,
    noise_level_norm          REAL,
    -- BPS normalised
    income_level_norm         REAL,
    first_buyer_ratio_norm    REAL,
    security_score_norm       REAL,
    employment_rate_norm      REAL,
    price_momentum_norm       REAL,
    diploma_rate_norm         REAL,
    greenery_pct_norm         REAL,
    pop_growth_norm           REAL,
    amenity_score_norm        REAL,
    transaction_rate_norm     REAL,
    -- Scores
    sps                       REAL,
    bps                       REAL,
    npi_raw                   REAL,
    npi                       REAL,
    built_at                  TEXT
)
"""

def _save_to_db(
    df: pd.DataFrame,
    sps_weights: dict[str, float],
    bps_weights: dict[str, float],
) -> None:
    """Write all raw indicators + scores to iris_pression table in MySQL."""
    conn = get_connection()

    sps_inds = list(sps_weights.keys())
    bps_inds = list(bps_weights.keys())
    all_inds = sps_inds + bps_inds
    built_at = datetime.now(timezone.utc).isoformat()

    rows = []
    for _, row in df.iterrows():
        code = str(row["code_iris"]).zfill(9)
        vals = [code]
        for ind in all_inds:
            v = row.get(ind, 0.0)
            vals.append(float(v) if pd.notna(v) else 0.0)
        for ind in all_inds:
            v = row.get(f"{ind}_norm", 0.0)
            vals.append(float(v) if pd.notna(v) else 0.0)
        for score in ("sps", "bps", "npi_raw", "npi"):
            vals.append(float(row[score]))
        vals.append(built_at)
        rows.append(tuple(vals))

    norm_cols = [f"{ind}_norm" for ind in all_inds]
    cols = (
        "code_iris, "
        + ", ".join(all_inds)
        + ", "
        + ", ".join(norm_cols)
        + ", sps, bps, npi_raw, npi, built_at"
    )
    placeholders = ", ".join(["%s"] * len(rows[0]))
    try:
        with conn.cursor() as cur:
            cur.execute("DROP TABLE IF EXISTS iris_pression")
            cur.execute(_DDL_IRIS_PRESSION)
            cur.executemany(
                f"REPLACE INTO iris_pression ({cols}) VALUES ({placeholders})",
                rows,
            )
        conn.commit()
    finally:
        conn.close()
    log.info("Saved %d rows -> iris_pression table in MySQL", len(rows))


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main() -> None:
    parser = argparse.ArgumentParser(
        description="Calcule SPS, BPS et NPI par IRIS pour le Nord (59)"
    )
    parser.add_argument(
        "--force-download", action="store_true",
        help="Retelecherge les fichiers INSEE meme s'ils sont deja en cache",
    )
    args = parser.parse_args()
    force = args.force_download

    try:
        sps_weights, bps_weights = load_indicator_weights()
        amenity_weights = load_amenity_weights()
        # Validate MySQL indicators match current v6 column names
        expected = set(_DEFAULT_SPS_WEIGHTS) | set(_DEFAULT_BPS_WEIGHTS)
        stale = (set(sps_weights) | set(bps_weights)) - expected
        if stale:
            raise RuntimeError(
                f"Stale indicator names in MySQL indicator_weights: {stale}.\n"
                f"Fix: python migrate_weights_mysql.py"
            )
    except RuntimeError as exc:
        log.warning("MySQL weights unavailable or stale — using built-in v6 defaults. (%s)", exc)
        sps_weights = _DEFAULT_SPS_WEIGHTS
        bps_weights = _DEFAULT_BPS_WEIGHTS
        amenity_weights = {}

    pop22        = _load_population_2022(force)
    pop18        = _load_population_2018(force)
    log_df       = _load_logements(force)
    act_df       = _load_activite(force)
    dipl         = _load_diplomes(force)
    filo         = _load_filosofi(force)
    defm         = _load_defm(force)
    caf          = _load_caf(force)
    amenity      = _load_bpe(force, amenity_weights)
    dvf_data     = _load_dvf_transaction_rate()
    greenery_df  = _load_nature_greenery()
    security_df  = _load_security()
    centroids    = _load_iris_centroids()
    flood_df     = _load_flood_risk(centroids)
    noise_df     = _load_noise_level(centroids)
    momentum_df  = _load_price_momentum()

    if pop22.empty or log_df.empty:
        log.error("Cannot proceed without RP2022 population and logement data.")
        return

    df = _compute_scores(
        pop22, pop18, log_df, act_df, dipl, filo,
        defm, caf, amenity, dvf_data, greenery_df, security_df,
        flood_df, noise_df, momentum_df,
        sps_weights, bps_weights,
    )

    # ------------------------------------------------------------------
    # JSON output
    # ------------------------------------------------------------------
    sps_inds = list(sps_weights.keys())
    bps_inds = list(bps_weights.keys())
    all_inds = sps_inds + bps_inds

    result: dict[str, dict] = {}
    for _, row in df.iterrows():
        code = str(row["code_iris"]).zfill(9)
        result[code] = {
            "sps":     round(float(row["sps"]), 4),
            "bps":     round(float(row["bps"]), 4),
            "npi_raw": round(float(row["npi_raw"]), 4),
            "npi":     round(float(row["npi"]), 4),
            "details": {ind: round(float(row[ind]), 4) for ind in all_inds},
            "sps_details": {
                ind: round(float(row[f"{ind}_norm"]), 4) for ind in sps_inds
            },
            "bps_details": {
                ind: round(float(row[f"{ind}_norm"]), 4) for ind in bps_inds
            },
        }

    os.makedirs(os.path.dirname(_OUTPUT), exist_ok=True)
    with open(_OUTPUT, "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False)
    log.info("Saved -> %s  (%d entries)", _OUTPUT, len(result))

    # ------------------------------------------------------------------
    # SQLite
    # ------------------------------------------------------------------
    _save_to_db(df, sps_weights, bps_weights)

    # ------------------------------------------------------------------
    # Summary table
    # ------------------------------------------------------------------
    sps_vals = [v["sps"] for v in result.values()]
    bps_vals = [v["bps"] for v in result.values()]
    npi_vals = [v["npi"] for v in result.values()]
    n = len(sps_vals)

    def _stats(vals: list[float]) -> tuple:
        s = sorted(vals)
        return (
            sum(vals) / n,
            min(vals), max(vals),
            s[n // 10], s[9 * n // 10],
        )

    sm, slo, shi, sp10, sp90 = _stats(sps_vals)
    bm, blo, bhi, bp10, bp90 = _stats(bps_vals)
    nm, nlo, nhi, np10, np90 = _stats(npi_vals)

    buyers   = sum(1 for v in npi_vals if v >  0.30)
    sellers  = sum(1 for v in npi_vals if v < -0.30)
    balanced = n - buyers - sellers

    print()
    print("=" * 66)
    print(f"  IRIS traites    : {n:,}")
    print(f"  JSON            : {_OUTPUT}")
    print(f"  MySQL           : iris_pression (see MYSQL_DATABASE env var)")
    print()
    print(f"  {'Score':<6}  {'mean':>6}  {'min':>6}  {'max':>6}  {'p10':>6}  {'p90':>6}")
    print(f"  {'-'*6}  {'-'*6}  {'-'*6}  {'-'*6}  {'-'*6}  {'-'*6}")
    print(f"  {'SPS':<6}  {sm:6.3f}  {slo:6.3f}  {shi:6.3f}  {sp10:6.3f}  {sp90:6.3f}")
    print(f"  {'BPS':<6}  {bm:6.3f}  {blo:6.3f}  {bhi:6.3f}  {bp10:6.3f}  {bp90:6.3f}")
    print(f"  {'NPI':<6}  {nm:6.3f}  {nlo:6.3f}  {nhi:6.3f}  {np10:6.3f}  {np90:6.3f}")
    print()
    print(f"  NPI > +0.30  (buying pressure)   : {buyers:,} IRIS  ({100*buyers/n:.1f} %)")
    print(f"  NPI   +/-0.30  (balanced)         : {balanced:,} IRIS  ({100*balanced/n:.1f} %)")
    print(f"  NPI < -0.30  (selling pressure)   : {sellers:,} IRIS  ({100*sellers/n:.1f} %)")
    print()

    # ------------------------------------------------------------------
    # Explanation  (v6)
    # ------------------------------------------------------------------
    print("=" * 66)
    print("  METHODOLOGIE ET INTERPRETATION  (v6)")
    print("=" * 66)
    print()
    print("  CONCEPT")
    print("  -------")
    print("  NPI est un indicateur RELATIF : il mesure l'ecart par rapport")
    print("  a l'equilibre propre au departement Nord. La moyenne ponderee")
    print("  par population est exactement 0 (par construction mathematique).")
    print("  Un NPI de +0.40 signifie 'pression achat nettement superieure")
    print("  a la moyenne du Nord', pas une valeur absolue universelle.")
    print()
    print("  SPS (Selling Pressure Score) mesure la propension a vendre.")
    print("  BPS (Buying Pressure Score) mesure la propension a acheter.")
    print("  NPI = BPS - SPS, centre, divise par 3*sigma_pondere.")
    print("  > +0.30  pression acheteuse (≈ +0.9 sigma au-dessus du Nord)")
    print("  < -0.30  pression vendeuse  (≈ -0.9 sigma en-dessous du Nord)")
    print("  +/-0.30  marche equilibre")
    print()
    print("  INDICATEURS SPS (poids total = 100 %)")
    print("  ----------------------------------------")
    print(f"  very_elderly_ratio ({int(sps_weights['very_elderly_ratio']*100)} %)")
    print("    Pop 75+ / pop totale. Pipeline d'offre imminent : les 75+")
    print("    font face aux transitions EHPAD et successions dans 5-10 ans.")
    print("    Source : RP2022 (P22_POP75P).")
    print()
    print(f"  elderly_owner      ({int(sps_weights['elderly_owner']*100)} %)")
    print("    (pop75+ / pop) x taux de propriete. Terme d'interaction :")
    print("    seuls les IRIS a la fois tres ages ET tres proprietaires")
    print("    ont un signal SPS eleve. Supprime le double-comptage")
    print("    de l'ancien indicateur senior_ownership (base 70+).")
    print("    Source : RP2022 (calcul derive).")
    print()
    print(f"  vacancy_rate       ({int(sps_weights['vacancy_rate']*100)} %)")
    print("    Logements vacants / total logements.")
    print("    Source : RP2022 (base-ic-logement-2022).")
    print()
    print(f"  pop_decline        ({int(sps_weights['pop_decline']*100)} %)")
    print("    (1 + max(0, pop2018-pop2022)/pop2018)^3 - 1  [exponentiel].")
    print("    1 % de declin -> ~3 % signal ; 5 % -> ~15.8 % signal.")
    print("    Sources : RP2018 + RP2022.")
    print()
    print(f"  poverty_rate       ({int(sps_weights['poverty_rate']*100)} %)")
    print("    Allocataires RSA actifs / personnes couvertes CAF.")
    print("    Proxy de detresse economique -> pression a la revente.")
    print("    Source : CAF 2023. IRIS non couverts : moyenne Nord.")
    print()
    print(f"  flood_risk         ({int(sps_weights['flood_risk']*100)} %)")
    print("    Score de risque inondation au centroide IRIS :")
    print("    zone frequente=1.0, moyenne=0.6, rare=0.2, hors zone=0.")
    print("    Source : SQLite flood_zones (GEORISQUES WFS TRI).")
    print()
    print(f"  noise_level        ({int(sps_weights['noise_level']*100)} %)")
    print("    Exposition au bruit : LDEN pondere par la proximite")
    print("    a la route/voie ferree la plus proche (buffer 500 m).")
    print("    Source : SQLite noise_zones (OSM via Overpass API).")
    print()
    print("  INDICATEURS BPS (poids total = 100 %)")
    print("  ----------------------------------------")
    print(f"  income_level       ({int(bps_weights['income_level']*100)} %)")
    print("    Revenu disponible median annuel (EUR).")
    print("    Source : Filosofi 2021. Fallback : mediane Nord.")
    print()
    print(f"  first_buyer_ratio  ({int(bps_weights['first_buyer_ratio']*100)} %)")
    print("    Pop 25-44 / pop totale. Tranche d'age d'achat en France")
    print("    (age median 1er achat ~32 ans ; monte en gamme 38-44 ans).")
    print("    Remplace prime_age_ratio (25-59) qui incluait les 55-59")
    print("    susceptibles de revendre plutot qu'acheter.")
    print("    Source : RP2022 (pop2539 + pop4054/3).")
    print()
    print(f"  security_score     ({int(bps_weights['security_score']*100)} %)")
    print("    10 - danger_score. Un IRIS sur attire plus d'acheteurs.")
    print("    Source : iris_securite.json + fallback commune Nord.")
    print()
    print(f"  employment_rate    ({int(bps_weights['employment_rate']*100)} %)")
    print("    Actifs occupes / (actifs occupes + DEFM) si DEFM dispo,")
    print("    sinon actocc / act (RP2022).")
    print("    Sources : RP2022 + DEFM 2024.")
    print()
    print(f"  price_momentum     ({int(bps_weights['price_momentum']*100)} %)")
    print("    CAGR du prix_m2_median DVF sur les annees disponibles.")
    print("    Confirmation marche : un NPS eleve sans hausse de prix")
    print("    se voit penalise. IRIS sans donnee : mediane Nord (≈0).")
    print("    Source : table prix_evolution_iris (DVF Nord).")
    print()
    print(f"  diploma_rate       ({int(bps_weights['diploma_rate']*100)} %)")
    print("    (Bac+2 + Bac+3-4 + Bac+5+) / pop 15+ non scolarisee.")
    print("    Source : RP2022.")
    print()
    print(f"  greenery_pct       ({int(bps_weights['greenery_pct']*100)} %)")
    print("    Fraction IRIS couverte par parcs/forets/zones naturelles.")
    print("    Source : OSM via Overpass API (build_nature_zones.py).")
    print()
    print(f"  amenity_score      ({int(bps_weights['amenity_score']*100)} %)")
    print("    Score d'equipement pondere (sante x2-3, ecoles x2,")
    print("    transports x3, commerces x1-1.5).")
    print("    Source : BPE 2024.")
    print()
    print(f"  transaction_rate   ({int(bps_weights['transaction_rate']*100)} %)")
    print("    Nombre moyen de transactions DVF par an par IRIS.")
    print("    Source : prix_evolution_iris (DVF Nord 2021-2025).")
    print()
    print("  CENTRAGE (v6)")
    print("  -------------")
    print("  npi_raw = bps - sps")
    print("  dept_mean = moyenne ponderee par pop sur tous les IRIS Nord")
    print("  npi_centree = npi_raw - dept_mean")
    print("  npi_final = clip(npi_centree / (3 * sigma_pondere), -1, +1)")
    print("  -> moyenne ponderee Nord = 0 (exacte, par construction)")
    print("  -> npi_raw stocke pour debug")
    print()
    print("  NORMALISATION")
    print("  -------------")
    print("  Chaque indicateur est normalise min-max sur l'ensemble des IRIS")
    print("  du Nord (59) avant ponderation. Un IRIS sans variation recoit 0.5.")
    print()
    print("  CHANGEMENTS v6 vs v5")
    print("  ---------------------")
    print("  SPS : senior_ratio + senior_ownership -> very_elderly_ratio + elderly_owner")
    print("        (focus 75+ ; suppression double-comptage 70+/ownership)")
    print("        + flood_risk (5 %) + noise_level (5 %)")
    print("  BPS : prime_age_ratio (25-59) -> first_buyer_ratio (25-44)")
    print("        + price_momentum (10 %)")
    print("  NPI : centrage pondere par population (mean = 0 par construction)")
    print()
    print("  LIMITES")
    print("  -------")
    print("  - RP2022 : donnees collectees ~2 ans avant publication")
    print("  - Filosofi ne couvre pas les communes < 5 000 hab")
    print("  - flood_risk/noise_level : point centroide IRIS, pas surface entiere")
    print("  - price_momentum : CAGR sur 2-4 ans DVF, sensible aux petits volumes")
    print("  - CAF/DEFM : couverture IRIS parfois partielle")
    print("=" * 66)


if __name__ == "__main__":
    main()
