---
title: "Step 6: Property Value Interpolation & EIF Census Enrichment"
author: "Russell Blessing"
format:
  html:
    toc: true
    toc-depth: 3
    code-fold: true
    code-summary: "Show / hide code"
execute:
  echo: true
  warning: false
  message: false
---

## Overview

This notebook does two related things:

**Property value interpolation.** For parcels matched to a mitigation or
application record but carrying an unreliable `TOTAL.VALUE.CALCULATED`
(vacant land, government-owned, anomalously low values), estimate a
market-comparable value by averaging the recorded values of the nearest
1–4 family residential donor parcels. Three estimates per recipient
(k = 5, 10, 50 nearest neighbors).

**EIF census enrichment.** For each mit/app point, attach Census Bureau
Gridded EIF attributes from two release years (1999 and 2020) at the
0.01° grid cell containing that point. Four EIF files are joined:

- Population × age × race × sex — 1999 and 2020
  (prefixes `ars_1999_`, `ars_2020_`)
- Population × race × income decile — 1999 and 2020
  (prefixes `ri_1999_`, `ri_2020_`)

The 1999 and 2020 baselines bracket the project's main disaster history
window, letting downstream analysis pull contextual demographics from a
year close to each mit/app's program record (e.g., a 1999 Hurricane Floyd
buyout record can use `ars_1999_*` columns; later records can use the
2020 baseline).

Both stages consume the **alt Step 5 resolved outputs** as the single source of
mit/app data. Each record carries an `assigned_parcel_index` (resolved
to one parcel per record, no fan-out) and a point geometry. The
interpolation uses `assigned_parcel_index` to count mits/apps per
parcel; the EIF enrichment uses the point geometry to snap each record
to its grid cell.

**Inputs:**

- `parcels_pri.gpkg` — parcels with priority scoring (canonical Step 5)
- `alt_mits_resolved.gpkg` — point-geometry mits, one row per record,
  resolved to one assigned parcel each (alt Step 5)
- `alt_apps_resolved.gpkg` — point-geometry apps, same structure (alt Step 5)
- Four Gridded EIF parquet files (downloaded on first run)

**Outputs (written to `out_dir`):**

- `interpolate_to.gpkg`, `interpolate_from.gpkg` — recipient and donor pools
- `interpolated_vals.gpkg`, `interpolated_parcels_final.csv` — parcels with interpolated values
- `mits_with_eif.gpkg`, `apps_with_eif.gpkg` — alt resolved outputs with EIF cell attributes
- `mits_with_eif.csv`, `apps_with_eif.csv` — same, no geometry

## Methodology

**Recipients (parcels with unreliable recorded property values)**: parcels
that have at least one mit or app assigned to them by alt Step 5 AND
whose recorded `TOTAL.VALUE.CALCULATED` is treated as unreliable.
A value is treated as unreliable if the parcel is:

- vacant land (`vacant == 1`) — no structure, recorded value reflects land only
- government land use (`gov_lu == 1`) — typically tax-exempt with $0 assessed
- government-owned by owner name (`gov_owned == 1`) — same
- low recorded value (`TOTAL.VALUE.CALCULATED < $25,000`) — anomalously low,
  likely indicates undervaluation, recent transfer, or data quality issue

**Donors (parcels providing value estimates)**: 1–4 family residential,
NOT government-owned, with non-missing recorded total value above
$25,000. 

**Mit/app per-parcel counts** are computed from `assigned_parcel_index`
in the alt resolved outputs — each unique mit/app contributes to
exactly one parcel. This avoids the fan-out double-counting that would
happen if we counted from canonical `mits_pcls.gpkg` (where a mit
hitting N parcels appears in N rows). Records with `assignment_method
== "unassigned"` are excluded from counts.

**Government-ownership detection** uses a regex on `OWNER.1.FULL.NAME`
matching common government-entity keywords (`CITY`, `STATE`, `TOWN`,
`VILLAGE`, `COUNTY`, `METROPOLITAN`, plus abbreviations) excluding
`INC` and `LLC`.

**EIF cell join** uses cell-membership rather than spatial intersection —
each mit/app point's lat/lon is snapped to its 0.01° grid cell centroid
via `floor(coord * 100) / 100 + 0.005` (cell centers are at `.005` offsets
per the EIF grid topology), then joined to EIF data by the snapped
coordinates as a character key. Equivalent result, much faster than `st_join`.



```{r libraries}
library(sf)
library(dplyr)
library(readr)
library(stringr)
library(tidyr)
library(purrr)
library(RANN)
library(arrow)
library(knitr)
```

```{r setup}
out_dir         <- "/proj/mhinolab/users/rbless/data/Obstacles_Output"
parcels_path    <- file.path(out_dir, "parcels_pri.gpkg")
mits_alt_path   <- file.path(out_dir, "alt_mits_resolved.gpkg")
apps_alt_path   <- file.path(out_dir, "alt_apps_resolved.gpkg")

# Tunables — interpolation (unchanged)
LOW_VALUE_THRESHOLD <- 25000
MIN_DONOR_VALUE     <- 25000
K_VALUES            <- c(5, 10, 50)

# Tunables — government ownership regex (unchanged)
GOV_KEYWORDS     <- "\\bCITY\\b|\\bSTATE\\b|\\bTOWN\\b|\\bVILLAGE\\b|\\bCOUNTY\\b|\\bMETROPOLITAN\\b|\\bCTY\\b|\\bST\\b|\\bTWN\\b|\\bVILL\\b|\\bVIL\\b|\\bCNTY\\b|\\bMETRO\\b"
EXCLUDE_KEYWORDS <- "\\bINC\\b|\\bLLC\\b"

# EIF source files — multi-year for ageracesex and raceincome,
# 2024-only for hu_age_homeval (no historical version exists for that file).
EIF_BASE_URL  <- "https://www2.census.gov/ces/gridded_eif"
EIF_POP_YEARS <- c(1999, 2020)

build_eif_files <- function() {
  files <- list()

  # Population × age × race × sex — multiple years
  for (yr in EIF_POP_YEARS) {
    name <- paste0("ageracesex_", yr)
    files[[name]] <- list(
      url    = file.path(EIF_BASE_URL, sprintf("gridded_eif_pop_ageracesex_%d.parquet", yr)),
      path   = file.path(out_dir,      sprintf("gridded_eif_pop_ageracesex_%d.parquet", yr)),
      prefix = paste0("ars_", yr)
    )
  }

  # Population × race × income decile — multiple years
  for (yr in EIF_POP_YEARS) {
    name <- paste0("raceincome_", yr)
    files[[name]] <- list(
      url    = file.path(EIF_BASE_URL, sprintf("gridded_eif_pop_raceincome_%d.parquet", yr)),
      path   = file.path(out_dir,      sprintf("gridded_eif_pop_raceincome_%d.parquet", yr)),
      prefix = paste0("ri_", yr)
    )
  }

  files
}

eif_files <- build_eif_files()

# NC bounding box (WGS84) with small buffer for cells crossing the boundary
NC_BBOX <- list(lat_min = 33.7, lat_max = 36.7,
                lon_min = -84.5, lon_max = -75.4)
```

## 1  Load parcels and alt mits/apps

```{r load-parcels}
parcels <- st_read(parcels_path, quiet = TRUE) |>
  st_transform(32119)

cat("Parcels loaded:", nrow(parcels), "rows\n")
```

```{r load-mits-apps}
mits_resolved <- st_read(mits_alt_path, quiet = TRUE)
apps_resolved <- st_read(apps_alt_path, quiet = TRUE)

cat("Mits (alt resolved):", nrow(mits_resolved), "rows\n")
cat("Apps (alt resolved):", nrow(apps_resolved), "rows\n")

# Quick distribution of how each side was assigned
cat("\nMit assignment_method distribution:\n")
mits_resolved |> st_drop_geometry() |> count(assignment_method, sort = TRUE) |> print()
cat("\nApp assignment_method distribution:\n")
apps_resolved |> st_drop_geometry() |> count(assignment_method, sort = TRUE) |> print()
```

## 2  Aggregate mit/app counts per parcel

One mit/app contributes to exactly one parcel — counted by
`assigned_parcel_index` from the alt resolved outputs. Records with
`assignment_method == "unassigned"` (no parcel within reach) are
excluded. Parcels with no matched mits/apps get a zero count.

```{r mit-app-counts}
mit_counts <- mits_resolved |>
  st_drop_geometry() |>
  filter(!is.na(assigned_parcel_index)) |>
  count(assigned_parcel_index, name = "mit_count") |>
  rename(parcel_index = assigned_parcel_index)

app_counts <- apps_resolved |>
  st_drop_geometry() |>
  filter(!is.na(assigned_parcel_index)) |>
  count(assigned_parcel_index, name = "app_count") |>
  rename(parcel_index = assigned_parcel_index)

parcels <- parcels |>
  left_join(mit_counts, by = "parcel_index") |>
  left_join(app_counts, by = "parcel_index") |>
  mutate(
    mit_count = replace_na(mit_count, 0L),
    app_count = replace_na(app_count, 0L)
  )

cat("Parcels with at least one assigned mit:", sum(parcels$mit_count > 0), "\n")
cat("Parcels with at least one assigned app:", sum(parcels$app_count > 0), "\n")
cat("Parcels with both:                     ",
    sum(parcels$mit_count > 0 & parcels$app_count > 0), "\n")
```

## 3  Identify government-owned parcels

```{r gov-owned}
parcels <- parcels |>
  mutate(
    owner_name_clean = replace_na(`OWNER.1.FULL.NAME`, ""),
    gov_owned = as.integer(
      str_detect(owner_name_clean, regex(GOV_KEYWORDS,    ignore_case = TRUE)) &
      !str_detect(owner_name_clean, regex(EXCLUDE_KEYWORDS, ignore_case = TRUE))
    )
  ) |>
  select(-owner_name_clean)

cat("Government-owned parcels (by owner name):", sum(parcels$gov_owned), "\n")
cat("Overlap with gov_lu (by land use code):  ",
    sum(parcels$gov_owned == 1 & parcels$gov_lu == 1), "\n")
```

## 4  Type conversions

`TOTAL.VALUE.CALCULATED` and `IMPROVEMENT.VALUE.CALCULATED` came in as
character. Coerce to numeric for the threshold filters and KNN means.

```{r type-conversions}
parcels <- parcels |>
  mutate(
    `TOTAL.VALUE.CALCULATED`       = suppressWarnings(as.numeric(`TOTAL.VALUE.CALCULATED`)),
    `IMPROVEMENT.VALUE.CALCULATED` = suppressWarnings(as.numeric(`IMPROVEMENT.VALUE.CALCULATED`)),
    fr_1_4 = as.integer(fr_1_4)
  )
```

## 5  Define recipient and donor pools

```{r build-pools}
to_interpolate <- parcels |>
  filter(
    (mit_count > 0 | app_count > 0),
    (
      vacant == 1 |
      gov_lu == 1 |
      gov_owned == 1 |
      (!is.na(`TOTAL.VALUE.CALCULATED`) & `TOTAL.VALUE.CALCULATED` < LOW_VALUE_THRESHOLD)
    )
  )

interpolate_from <- parcels |>
  filter(
    fr_1_4 == 1,
    gov_owned == 0,
    !is.na(`TOTAL.VALUE.CALCULATED`),
    `TOTAL.VALUE.CALCULATED` > MIN_DONOR_VALUE
  )

cat("Recipients (to_interpolate):", nrow(to_interpolate), "\n")
cat("Donors (interpolate_from):  ", nrow(interpolate_from), "\n")

st_write(to_interpolate,   file.path(out_dir, "interpolate_to.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)
st_write(interpolate_from, file.path(out_dir, "interpolate_from.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)
```


## 6  K-nearest-neighbor interpolation

For each recipient parcel, find the K nearest donor parcels by centroid
distance, then average their `TOTAL.VALUE.CALCULATED`. Three values of
K (5, 10, 50) computed in parallel.

```{r knn-interpolation}
from_coords <- st_coordinates(st_centroid(interpolate_from))
to_coords   <- st_coordinates(st_centroid(to_interpolate))

donor_values <- interpolate_from$`TOTAL.VALUE.CALCULATED`

nn_results <- map(K_VALUES, ~ RANN::nn2(from_coords, to_coords, k = .x))
names(nn_results) <- paste0("k", K_VALUES)

prop_values <- map2_dfc(K_VALUES, nn_results, function(k, nn) {
  vals <- matrix(donor_values[nn$nn.idx], ncol = k)
  tibble(!!paste0("prop_value_", k) := rowMeans(vals))
})

to_interpolate <- bind_cols(to_interpolate, prop_values)

cat("Recipient row count:", nrow(to_interpolate), "\n")
cat("All prop_value_5 non-NA:",  all(!is.na(to_interpolate$prop_value_5)),  "\n")
cat("All prop_value_10 non-NA:", all(!is.na(to_interpolate$prop_value_10)), "\n")
cat("All prop_value_50 non-NA:", all(!is.na(to_interpolate$prop_value_50)), "\n")
```

## 7  Join interpolated values back to all parcels

```{r join-back}
to_interp_attrs <- to_interpolate |>
  st_drop_geometry() |>
  select(parcel_index, prop_value_5, prop_value_10, prop_value_50)

joined <- parcels |>
  left_join(to_interp_attrs, by = "parcel_index") |>
  mutate(
    interpolate = as.integer(parcel_index %in% to_interpolate$parcel_index),
    prop_value_5  = coalesce(prop_value_5,  `TOTAL.VALUE.CALCULATED`),
    prop_value_10 = coalesce(prop_value_10, `TOTAL.VALUE.CALCULATED`),
    prop_value_50 = coalesce(prop_value_50, `TOTAL.VALUE.CALCULATED`)
  )

cat("Total parcels in joined output:", nrow(joined), "\n")
cat("Parcels marked as interpolated:", sum(joined$interpolate == 1), "\n")
```

## 8  Write interpolated parcels output

```{r write-interpolation-outputs}
st_write(joined, file.path(out_dir, "interpolated_vals.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)

joined |>
  st_drop_geometry() |>
  write_csv(file.path(out_dir, "interpolated_parcels_final.csv"))
```

## 9  Gridded EIF Census enrichment

The Census Bureau Gridded EIF provides privacy-protected counts on a
fixed 0.01° grid (~1.2 km² per cell). Four files across two years
are joined to each mit/app point:

| File | Cross-tab | Prefix | Years |
|---|---|---|---|
| `gridded_eif_pop_ageracesex_{year}.parquet` | population × age × race × sex | `ars_{year}_` | 1999, 2020 |
| `gridded_eif_pop_raceincome_{year}.parquet` | population × race × income decile | `ri_{year}_` | 1999, 2020 |

Cell-membership join via `floor(coord * 100) / 100 + 0.005` — equivalent to
spatial intersection but faster.

### 9.1  Download EIF files (one-time)

```{r eif-download}
for (name in names(eif_files)) {
  f <- eif_files[[name]]
  if (!file.exists(f$path)) {
    message("Downloading ", name, " -> ", basename(f$path))
    tryCatch(
      download.file(f$url, f$path, mode = "wb"),
      error = function(e) {
        stop("EIF download failed for ", name, ": ", conditionMessage(e),
             "\nIf HPC egress is restricted, download manually via wget ",
             "from a login node and place at ", f$path)
      }
    )
  } else {
    message("Already present: ", basename(f$path))
  }
}
```

### 9.2  Inspect EIF schemas

```{r eif-inspect-schemas}
for (name in names(eif_files)) {
  f <- eif_files[[name]]
  cat("\n=== ", name, " ===\n")
  print(arrow::open_dataset(f$path)$schema)
}
```

### 9.3  Load EIF grid topology

The topology file is the authoritative cell registry — it defines all valid
land cells on the 0.01° grid, including cells with zero population (absent
from the parquets). Joining mits/apps to the topology lets us distinguish
**why** a record has NA EIF columns:

- `valid_cell == TRUE` & EIF columns non-NA → populated cell, matched ✓  
- `valid_cell == TRUE` & EIF columns NA → valid land cell, zero/suppressed population  
- `valid_cell == FALSE` → water, outside-US, or geocoding error  

```{r eif-load-topology}
topo_path <- file.path(out_dir, "eif_grid_topology.rda")

if (!file.exists(topo_path)) {
  message("Downloading EIF grid topology...")
  download.file(
    "https://www2.census.gov/ces/gridded_eif/eif_grid_topology.rda",
    topo_path, mode = "wb"
  )
}

local({
  e <- new.env()
  load(topo_path, envir = e)
  topo_obj <- get(ls(e)[1], envir = e)
  # SpatialGridDataFrame -> data frame of cell-center coordinates
  topo_df <<- as.data.frame(topo_obj) |>
    dplyr::select(grid_lon, grid_lat) |>
    dplyr::mutate(
      eif_lat = round(as.numeric(grid_lat), 3),
      eif_lon = round(as.numeric(grid_lon), 3)
    ) |>
    dplyr::select(eif_lat, eif_lon) |>
    dplyr::distinct()
})

# Restrict to NC bounding box to keep the join fast
topo_nc <- topo_df |>
  dplyr::filter(
    eif_lat >= NC_BBOX$lat_min, eif_lat <= NC_BBOX$lat_max,
    eif_lon >= NC_BBOX$lon_min, eif_lon <= NC_BBOX$lon_max
  ) |>
  dplyr::mutate(valid_cell = TRUE)

cat("NC topology cells:", nrow(topo_nc), "\n")
```

### 9.4  Snap mit/app points to EIF cells

```{r eif-snap-cells}
add_eif_cell <- function(sf_obj) {
  coords <- sf_obj |> st_transform(4326) |> st_coordinates()
  sf_obj |>
    mutate(
      eif_lon = round(floor(coords[, "X"] * 100) / 100 + 0.005, 3),
      eif_lat = round(floor(coords[, "Y"] * 100) / 100 + 0.005, 3)
    )
}

mits_resolved <- add_eif_cell(mits_resolved)
apps_resolved <- add_eif_cell(apps_resolved)

# Join topology to flag valid land cells (TRUE) vs water/invalid (FALSE/NA)
mits_resolved <- mits_resolved |>
  left_join(topo_nc, by = c("eif_lat", "eif_lon")) |>
  mutate(valid_cell = replace_na(valid_cell, FALSE))

apps_resolved <- apps_resolved |>
  left_join(topo_nc, by = c("eif_lat", "eif_lon")) |>
  mutate(valid_cell = replace_na(valid_cell, FALSE))

cat("Mits — valid topology cell:   ", sum(mits_resolved$valid_cell), "/", nrow(mits_resolved), "\n")
cat("Mits — invalid/water cell:    ", sum(!mits_resolved$valid_cell), "\n")
cat("Apps — valid topology cell:   ", sum(apps_resolved$valid_cell), "/", nrow(apps_resolved), "\n")
cat("Apps — invalid/water cell:    ", sum(!apps_resolved$valid_cell), "\n")
```

### 9.5  Snap unmatched records to nearest populated EIF cell (per year)

Records that land on a valid topology cell but have no population data in a
given year's parquet (zero/DP-suppressed) are snapped to the nearest cell
that *does* have population data for that year. Year-specific keys are stored
in separate columns (`eif_lat_{year}`, `eif_lon_{year}`) so each parquet join
uses the most appropriate cell. The base `eif_lat`/`eif_lon` columns (original
snapped position) are preserved for reference.

```{r eif-build-nc-tables}
# Cells actually referenced by any mit or app — typically a small fraction
# of all NC cells, so filtering early massively reduces working-set size.
needed_cells <- bind_rows(
  mits_resolved |> st_drop_geometry() |> select(eif_lat, eif_lon),
  apps_resolved |> st_drop_geometry() |> select(eif_lat, eif_lon)
) |> distinct()

cat("Unique EIF cells needed:", nrow(needed_cells), "\n")

# ---------------------------------------------------------------------------
# Schema-adaptive helper: identify categorical (key) vs numeric (value) cols.
# EIF coord columns and any already-derived eif_* keys are excluded.
# ---------------------------------------------------------------------------
split_cols <- function(df) {
  exclude_pat <- "^(eif_lat|eif_lon|grid_lat|grid_lon)"
  candidate   <- names(df)[!grepl(exclude_pat, names(df))]
  val_pat  <- "^n_|count|value|pop|hu_|estimate|moe|_noise|_postprocessed"
  val_cols <- candidate[grepl(val_pat, candidate, ignore.case = TRUE) &
                          sapply(df[candidate], is.numeric)]
  key_cols <- setdiff(candidate, val_cols)
  list(key = key_cols, val = val_cols)
}

# Pivot long -> one summary row per cell; prefix all value columns.
aggregate_eif_wide <- function(d, prefix) {
  cols <- split_cols(d)

  # Guard: if no value columns recognized, warn loudly and return keys-only.
  # This usually means split_cols's regex didn't match the parquet column
  # naming. Inspect schema and update val_pat in split_cols.
  if (length(cols$val) == 0) {
    warning(sprintf(
      "aggregate_eif_wide('%s'): no value columns identified. ",
      prefix
    ), "Available columns: ", paste(names(d), collapse = ", "))
    return(
      d |> dplyr::select(eif_lat, eif_lon) |> dplyr::distinct()
    )
  }

  if (length(cols$key) == 0) {
    # No categorical keys to pivot over — already 1 row per cell.
    # Just prefix the value columns.
    d |>
      dplyr::select(eif_lat, eif_lon, dplyr::all_of(cols$val)) |>
      dplyr::rename_with(
        ~ paste0(prefix, "_", .x),
        dplyr::all_of(cols$val)
      )
  } else {
    d |>
      tidyr::pivot_wider(
        id_cols     = c(eif_lat, eif_lon),
        names_from  = dplyr::all_of(cols$key),
        values_from = dplyr::all_of(cols$val),
        names_sep   = "_",
        values_fn   = sum
      ) |>
      dplyr::rename_with(
        ~ paste0(prefix, "_", .x),
        -c(eif_lat, eif_lon)
      )
  }
}

# ---------------------------------------------------------------------------
# Process each EIF file:
#   Arrow bbox pushdown -> collect NC rows -> filter to needed cells ->
#   aggregate to one row per cell -> release raw data.
# Sequential to keep peak memory bounded.
# ---------------------------------------------------------------------------
process_one_eif <- function(nm, needed_cells) {
  f      <- eif_files[[nm]]
  prefix <- f$prefix
  cat("\nProcessing", nm, "(prefix:", prefix, ")... ")

  raw <- arrow::open_dataset(f$path) |>
    dplyr::filter(
      as.numeric(grid_lat) >= NC_BBOX$lat_min,
      as.numeric(grid_lat) <= NC_BBOX$lat_max,
      as.numeric(grid_lon) >= NC_BBOX$lon_min,
      as.numeric(grid_lon) <= NC_BBOX$lon_max
    ) |>
    dplyr::collect() |>
    dplyr::mutate(
      eif_lat = round(as.numeric(grid_lat), 3),
      eif_lon = round(as.numeric(grid_lon), 3)
    ) |>
    dplyr::select(-grid_lat, -grid_lon)

  cat("NC rows:", nrow(raw), "| ")

  raw_needed <- raw |> inner_join(needed_cells, by = c("eif_lat", "eif_lon"))
  cat("needed-cell rows:", nrow(raw_needed), "| ")

  # Sanity check: rows/cell BEFORE aggregation — confirms long vs wide format
  rpc <- raw_needed |> count(eif_lat, eif_lon) |> pull(n) |> median()
  cat("median rows/cell:", rpc, "| ")

  summary_tbl <- aggregate_eif_wide(raw_needed, prefix)
  cat("summary rows:", nrow(summary_tbl), "\n")

  rm(raw, raw_needed); gc()
  summary_tbl
}

# Verify Arrow can push down the numeric-cast bbox filter before running all files.
# If this takes > 30s or returns millions of rows, the filter is not pushing down —
# in that case switch process_one_eif to collect() first, then filter().
local({
  test_q <- arrow::open_dataset(eif_files[[1]]$path) |>
    dplyr::filter(
      as.numeric(grid_lat) >= NC_BBOX$lat_min,
      as.numeric(grid_lat) <= NC_BBOX$lat_max,
      as.numeric(grid_lon) >= NC_BBOX$lon_min,
      as.numeric(grid_lon) <= NC_BBOX$lon_max
    )
  elapsed <- system.time({ n <- nrow(dplyr::collect(test_q)) })
  cat(sprintf("Arrow filter test: %d rows in %.1fs\n", n, elapsed["elapsed"]))
  if (elapsed["elapsed"] > 30)
    warning("Arrow pushdown may not be working — consider collect()-first fallback")
})

eif_nc_tables <- map(names(eif_files), process_one_eif,
                     needed_cells = needed_cells) |>
  set_names(names(eif_files))

# Confirm 1 row per cell in every summary (0 duplicate keys = join is 1:1)
walk2(names(eif_nc_tables), eif_nc_tables, function(nm, s) {
  dups <- nrow(s) - nrow(distinct(s, eif_lat, eif_lon))
  cat(sprintf("  %-30s %d cells, %d duplicate keys\n", nm, nrow(s), dups))
})

# Keys-only sf layers for nearest-neighbour search (unchanged from before)
eif_cells_sf <- map(eif_nc_tables, function(tbl) {
  tbl |>
    dplyr::select(eif_lat, eif_lon) |>
    dplyr::distinct() |>
    sf::st_as_sf(coords = c("eif_lon", "eif_lat"), crs = 4326, remove = FALSE)
})

cat("\nNC EIF cell counts per dataset:\n")
walk2(names(eif_cells_sf), eif_cells_sf,
      ~ cat(sprintf("  %-30s %d cells\n", .x, nrow(.y))))
```

```{r eif-snap-nearest}
# For each dataset, snap unmatched records to nearest populated cell
snap_to_nearest <- function(sf_obj, populated_sf, lat_col, lon_col) {
  pop_coords <- sf::st_coordinates(populated_sf)
  pop_keys <- paste(round(pop_coords[, "Y"], 3),
                    round(pop_coords[, "X"], 3))
  obj_keys <- paste(round(sf::st_drop_geometry(sf_obj)[[lat_col]], 3),
                    round(sf::st_drop_geometry(sf_obj)[[lon_col]], 3))
  unmatched_idx <- which(!obj_keys %in% pop_keys)

  if (length(unmatched_idx) == 0) {
    cat("  No unmatched records — skipping nearest-neighbor snap\n")
    return(sf_obj)
  }

  nearest_idx <- sf::st_nearest_feature(
    sf_obj[unmatched_idx, ] |> sf::st_transform(4326),
    populated_sf
  )
  nearest_coords <- sf::st_coordinates(populated_sf)[nearest_idx, ]

  sf_obj[[lon_col]][unmatched_idx] <- round(nearest_coords[, "X"], 3)
  sf_obj[[lat_col]][unmatched_idx] <- round(nearest_coords[, "Y"], 3)

  cat("  Snapped", length(unmatched_idx), "unmatched records to nearest populated cell\n")
  sf_obj
}

# Apply per dataset, writing into year-specific key columns
add_year_keys <- function(sf_obj, label) {
  cat(label, "nearest-neighbor snapping:\n")
  result <- sf_obj

  for (nm in names(eif_files)) {
    yr      <- eif_files[[nm]]$prefix
    lat_col <- paste0("eif_lat_", yr)
    lon_col <- paste0("eif_lon_", yr)

    result[[lat_col]] <- result$eif_lat
    result[[lon_col]] <- result$eif_lon

    cat(" ", nm, "->", lat_col, "/", lon_col, ":")
    result <- snap_to_nearest(result, eif_cells_sf[[nm]], lat_col, lon_col)
  }
  result
}

mits_resolved <- add_year_keys(mits_resolved, "Mits")
apps_resolved <- add_year_keys(apps_resolved, "Apps")
```

### 9.6  Join all EIF datasets onto mits/apps

```{r eif-join}
# Drop geometry before joining — sf objects with hundreds of columns are
# extremely memory-expensive on every left_join. Re-attach at the end.
join_eif_all <- function(sf_obj) {
  geom      <- sf::st_geometry(sf_obj)
  result    <- sf::st_drop_geometry(sf_obj)

  for (nm in names(eif_files)) {
    yr      <- eif_files[[nm]]$prefix
    lat_col <- paste0("eif_lat_", yr)
    lon_col <- paste0("eif_lon_", yr)

    # Columns are already prefixed by aggregate_eif_wide (e.g. ars_1999_*).
    # Only rename the cell-key columns to match the year-specific join keys.
    result <- result |>
      dplyr::left_join(
        eif_nc_tables[[nm]] |>
          dplyr::rename(!!lat_col := eif_lat, !!lon_col := eif_lon),
        by = c(lat_col, lon_col)
      )
  }

  sf::st_sf(result, geometry = geom)
}

mits_with_eif <- join_eif_all(mits_resolved)
gc()

apps_with_eif <- join_eif_all(apps_resolved)
gc()

# Coverage diagnostics
coverage <- function(df, label) {
  cat("\n", label, "EIF coverage:\n")
  cat(sprintf("  %-35s %d\n", "Total records", nrow(df)))
  cat(sprintf("  %-35s %d  (%.1f%%)\n", "Valid topology cell",
              sum(df$valid_cell), 100 * mean(df$valid_cell)))
  cat(sprintf("  %-35s %d  (%.1f%%)\n", "Invalid/water cell",
              sum(!df$valid_cell), 100 * mean(!df$valid_cell)))
  cat("\n  Per-dataset match rates (after nearest-neighbor fill):\n")

  df_plain <- sf::st_drop_geometry(df)

  for (nm in names(eif_files)) {
    pfx       <- eif_files[[nm]]$prefix
    pfx_cols  <- grep(paste0("^", pfx, "_"), names(df_plain), value = TRUE)
    # Exclude snap-key columns — always populated after snap, not EIF data
    data_cols <- setdiff(pfx_cols,
                         c(paste0("eif_lat_", pfx),
                           paste0("eif_lon_", pfx)))

    if (length(data_cols) == 0) {
      cat(sprintf("  %-30s no data columns found\n", nm))
      next
    }

    has_any_data <- rowSums(!is.na(df_plain[, data_cols, drop = FALSE])) > 0
    n_matched <- sum(has_any_data)
    cat(sprintf("  %-30s matched: %d (%.1f%%)\n",
                nm, n_matched, 100 * n_matched / nrow(df_plain)))
  }
}

coverage(mits_with_eif, "Mitigations")
coverage(apps_with_eif, "Applications")
```

```{r eif-write}
st_write(mits_with_eif, file.path(out_dir, "mits_with_eif.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)
st_write(apps_with_eif, file.path(out_dir, "apps_with_eif.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)

mits_with_eif |> st_drop_geometry() |>
  write_csv(file.path(out_dir, "mits_with_eif.csv"))
apps_with_eif |> st_drop_geometry() |>
  write_csv(file.path(out_dir, "apps_with_eif.csv"))

cat("Written:\n")
cat("  mits_with_eif.gpkg / .csv —", nrow(mits_with_eif), "rows,",
    ncol(sf::st_drop_geometry(mits_with_eif)), "columns\n")
cat("  apps_with_eif.gpkg / .csv —", nrow(apps_with_eif), "rows,",
    ncol(sf::st_drop_geometry(apps_with_eif)), "columns\n")
```
## Summary

```{r summary-table}
eif_coverage <- sapply(names(eif_files), function(nm) {
  pfx       <- eif_files[[nm]]$prefix
  df_plain  <- sf::st_drop_geometry(mits_with_eif)
  pfx_cols  <- grep(paste0("^", pfx, "_"), names(df_plain), value = TRUE)
  data_cols <- setdiff(pfx_cols,
                       c(paste0("eif_lat_", pfx),
                         paste0("eif_lon_", pfx)))
  if (length(data_cols) == 0) return(NA_integer_)
  sum(rowSums(!is.na(df_plain[, data_cols, drop = FALSE])) > 0)
})

summary_tbl <- tibble(
  metric = c(
    "Total parcels",
    "Recipients (interpolated)",
    "Donors (used for averaging)",
    "Median original TOTAL.VALUE.CALCULATED (recipients only)",
    "Median prop_value_5",
    "Median prop_value_10",
    "Median prop_value_50",
    paste0("Mits with EIF cell match — ", names(eif_files))
  ),
  value = c(
    nrow(joined),
    sum(joined$interpolate == 1),
    nrow(interpolate_from),
    median(to_interpolate$`TOTAL.VALUE.CALCULATED`, na.rm = TRUE),
    median(to_interpolate$prop_value_5,  na.rm = TRUE),
    median(to_interpolate$prop_value_10, na.rm = TRUE),
    median(to_interpolate$prop_value_50, na.rm = TRUE),
    eif_coverage
  )
)

knitr::kable(summary_tbl, format.args = list(big.mark = ","))
```

```{r value-comparison}
to_interpolate |>
  st_drop_geometry() |>
  select(parcel_index, original = `TOTAL.VALUE.CALCULATED`,
         prop_value_5, prop_value_10, prop_value_50) |>
  pivot_longer(-parcel_index, names_to = "metric", values_to = "value") |>
  group_by(metric) |>
  summarise(
    n      = sum(!is.na(value)),
    p10    = quantile(value, 0.10, na.rm = TRUE),
    median = median(value,         na.rm = TRUE),
    mean   = mean(value,           na.rm = TRUE),
    p90    = quantile(value, 0.90, na.rm = TRUE),
    .groups = "drop"
  ) |>
  knitr::kable(
    format.args = list(big.mark = ",", scientific = FALSE),
    caption = "Distribution of original vs. interpolated values for recipient parcels"
  )
```

## Output schema

**`interpolated_vals.gpkg`** / **`interpolated_parcels_final.csv`** — one row per parcel:

| Column | Source | Notes |
|---|---|---|
| (all original parcel columns) | parcels_pri.gpkg | unchanged |
| `mit_count` | derived | per-parcel count of mits assigned via alt Step 5 |
| `app_count` | derived | per-parcel count of apps assigned via alt Step 5 |
| `gov_owned` | derived | 1 if owner name matches gov regex (excluding INC/LLC) |
| `interpolate` | derived | 1 if parcel was a recipient (had value estimated) |
| `prop_value_5` | derived | mean of 5 nearest donors; original if not interpolated |
| `prop_value_10` | derived | mean of 10 nearest donors; original if not interpolated |
| `prop_value_50` | derived | mean of 50 nearest donors; original if not interpolated |

**`mits_with_eif.gpkg`** / **`apps_with_eif.gpkg`** — one row per mit/app:

| Column | Source | Notes |
|---|---|---|
| (all original alt-resolved columns) | alt Step 5 outputs | unchanged |
| `eif_lat`, `eif_lon` | derived | original snapped 0.01° cell-center (base position) |
| `valid_cell` | topology join | `TRUE` = valid land cell per EIF topology; `FALSE` = water/outside-US or geocoding error |
| `eif_lat_{prefix}`, `eif_lon_{prefix}` | nearest-neighbor fill | year-specific cell keys; equal to base keys for matched cells, snapped to nearest populated cell otherwise (one pair per EIF dataset, e.g. `eif_lat_ars_1999`) |
| `ars_1999_*`, `ars_2020_*` columns | EIF pop_ageracesex (1999 + 2020) | population × age × race × sex |
| `ri_1999_*`, `ri_2020_*` columns | EIF pop_raceincome (1999 + 2020) | population × race × income decile |


## Section 10: Analysis-ready summary outputs

```{r eif-inspect-suffix-tokens}
library(stringr)

# Verify actual suffix tokens before aggregating
for (yr in c(1999, 2020)) {
  cat("\n=== ars_", yr, " sample postprocessed cols ===\n", sep = "")
  print(head(grep(paste0("^ars_", yr, "_n_noise_postprocessed_"),
                  names(mits_with_eif), value = TRUE), 10))

  cat("\n=== ri_", yr, " sample postprocessed cols ===\n", sep = "")
  print(head(grep(paste0("^ri_", yr, "_n_noise_postprocessed_"),
                  names(mits_with_eif), value = TRUE), 10))
}
```

```{r eif-derive-summaries}
summarize_eif <- function(df, years = c(1999, 2020)) {
  result    <- df
  df_plain  <- sf::st_drop_geometry(df)

  # Race tokens exactly as encoded in column names (bare labels, no NH prefix)
  race_pats <- c(
    white    = "_White(?:_[MF])?$",
    black    = "_Black(?:_[MF])?$",
    hispanic = "_Hispanic(?:_[MF])?$",
    asian    = "_Asian(?:_[MF])?$",
    aian     = "_AIAN(?:_[MF])?$"
  )

  for (yr in years) {

    # ── ars: population × age × race × sex → race aggregates ────────────────
    ars_pp <- grep(paste0("^ars_", yr, "_n_noise_postprocessed_"),
                   names(df_plain), value = TRUE)

    pop_total <- rowSums(df_plain[, ars_pp, drop = FALSE], na.rm = TRUE)
    result[[paste0("pop_total_", yr)]] <- pop_total

    for (race_name in names(race_pats)) {
      pat  <- race_pats[[race_name]]
      cols <- ars_pp[grepl(pat, ars_pp, perl = TRUE)]
      race_pop <- if (length(cols) > 0)
        rowSums(df_plain[, cols, drop = FALSE], na.rm = TRUE)
      else
        rep(NA_real_, nrow(df_plain))

      result[[paste0("pct_", race_name, "_", yr)]] <-
        ifelse(pop_total > 0, race_pop / pop_total, NA_real_)
    }

    # ── ri: population × race × income decile → income aggregates ───────────
    ri_pp <- grep(paste0("^ri_", yr, "_n_noise_postprocessed_"),
                  names(df_plain), value = TRUE)

    if (length(ri_pp) > 0) {
      ri_total <- rowSums(df_plain[, ri_pp, drop = FALSE], na.rm = TRUE)

      # Decile 1 = bottom, decile 10 = top; decile 0 excluded (suppressed bucket)
      low_cols  <- ri_pp[grepl("_postprocessed_1_",  ri_pp, fixed = TRUE)]
      high_cols <- ri_pp[grepl("_postprocessed_10_", ri_pp, fixed = TRUE)]

      result[[paste0("pct_low_income_",  yr)]] <-
        ifelse(ri_total > 0,
               rowSums(df_plain[, low_cols,  drop = FALSE], na.rm = TRUE) / ri_total,
               NA_real_)
      result[[paste0("pct_high_income_", yr)]] <-
        ifelse(ri_total > 0,
               rowSums(df_plain[, high_cols, drop = FALSE], na.rm = TRUE) / ri_total,
               NA_real_)
    }
  }

  result
}

mits_summary <- summarize_eif(mits_with_eif)
apps_summary <- summarize_eif(apps_with_eif)
```

```{r eif-summary-inventory}
summary_cols <- grep("^(pop_total_|pct_)", names(mits_summary), value = TRUE)

cat("Summary columns derived (", length(summary_cols), "):\n", sep = "")
print(summary_cols)

cat("\npct_white_1999 distribution:\n")
print(summary(sf::st_drop_geometry(mits_summary)$pct_white_1999))

cat("\npct_black_2020 distribution:\n")
print(summary(sf::st_drop_geometry(mits_summary)$pct_black_2020))

cat("\npct_low_income_1999 distribution:\n")
print(summary(sf::st_drop_geometry(mits_summary)$pct_low_income_1999))

# Sanity: pop_total should be non-zero for all records (post-snap)
cat("\npop_total_1999 zero or NA count:", 
    sum(sf::st_drop_geometry(mits_summary)$pop_total_1999 == 0 | 
        is.na(sf::st_drop_geometry(mits_summary)$pop_total_1999)), "\n")
cat("pop_total_2020 zero or NA count:", 
    sum(sf::st_drop_geometry(mits_summary)$pop_total_2020 == 0 | 
        is.na(sf::st_drop_geometry(mits_summary)$pop_total_2020)), "\n")
```

```{r eif-write-slim}
slim_cols_pattern <- paste(c(
  # Record + parcel identifiers
  "^unique_id$",
  "^assigned_parcel_index$",
  "^assignment_method$",

  # Mit/app address and disaster fields
  "^address$",
  "^raw_addr$",
  "^Match_addr$",
  "^street_address$",
  "^city$",
  "^City$",
  "^county$",
  "^County$",
  "^Postal$",
  "^zip$",
  "^latitude$",
  "^longitude$",
  "^disas_code",
  "^disas_program$",
  "^disas_year$",
  "^project_type$",
  "^project_status$",
  "^FileEvent",
  "^Filename$",
  "^Eventyear",
  "^Year$",
  "^year_conflict$",
  "^mhp_flag$",
  "^address_quality$",

  # Parcel attributes
  "^parcel_index$",
  "^geometry_id$",
  "^cntyfips$",
  "^parno$",
  "^siteadd$",
  "^ownname$",
  "^parusedesc$",
  "^LAND\\.USE\\.CODE$",
  "^luc_tier$",
  "^OWNER\\.1\\.FULL\\.NAME$",
  "^YEAR\\.BUILT$",
  "^ASSESSED\\.TOTAL\\.VALUE$",
  "^IMPROVEMENT\\.VALUE\\.CALCULATED$",

  # EIF cell traceability
  "^eif_lat$",
  "^eif_lon$",
  "^valid_cell$",
  "^eif_lat_(ars|ri)_",
  "^eif_lon_(ars|ri)_",

  # Derived summary columns
  "^pop_total_",
  "^pct_"
), collapse = "|")

slim <- function(df) dplyr::select(df, dplyr::matches(slim_cols_pattern))

mits_slim <- slim(mits_summary)
apps_slim <- slim(apps_summary)

cat("Mits slim: ", ncol(mits_slim), " cols (down from ", ncol(mits_with_eif), ")\n", sep = "")
cat("Apps slim: ", ncol(apps_slim), " cols (down from ", ncol(apps_with_eif), ")\n", sep = "")
cat("Mits rows: ", nrow(mits_slim), " | Apps rows: ", nrow(apps_slim), "\n", sep = "")

sf::st_write(mits_slim, file.path(out_dir, "mits_with_eif_summary.gpkg"),
             delete_dsn = TRUE, quiet = TRUE)
sf::st_write(apps_slim, file.path(out_dir, "apps_with_eif_summary.gpkg"),
             delete_dsn = TRUE, quiet = TRUE)

mits_slim |> sf::st_drop_geometry() |>
  readr::write_csv(file.path(out_dir, "mits_with_eif_summary.csv"))
apps_slim |> sf::st_drop_geometry() |>
  readr::write_csv(file.path(out_dir, "apps_with_eif_summary.csv"))

cat("Written:\n")
cat("  mits_with_eif_summary.gpkg / .csv\n")
cat("  apps_with_eif_summary.gpkg / .csv\n")
```

```{r eif-cells-full}
# Full NC cell summary for Step 7 parcel-centroid join.
# Re-reads parquets directly — eif_nc_tables is filtered to needed_cells
# (mit/app cells only) and cannot serve as the full-NC spine.
# Spine = topo_nc (all valid NC land cells).

build_full_nc_cell_summary <- function() {

  years <- c(1999, 2020)
  cell_summary <- topo_nc |> select(eif_lat, eif_lon)

  for (yr in years) {

    # --- ageracesex ---
    ars_raw <- arrow::open_dataset(
      file.path(out_dir, sprintf("gridded_eif_pop_ageracesex_%d.parquet", yr))
    ) |>
      dplyr::filter(
        as.numeric(grid_lat) >= NC_BBOX$lat_min,
        as.numeric(grid_lat) <= NC_BBOX$lat_max,
        as.numeric(grid_lon) >= NC_BBOX$lon_min,
        as.numeric(grid_lon) <= NC_BBOX$lon_max
      ) |>
      dplyr::collect() |>
      dplyr::mutate(
        eif_lat = round(as.numeric(grid_lat), 3),
        eif_lon = round(as.numeric(grid_lon), 3)
      ) |>
      dplyr::select(-grid_lat, -grid_lon)

    ars_wide <- aggregate_eif_wide(ars_raw, paste0("ars_", yr))
    rm(ars_raw); gc()

    ars_pp_cols <- grep(paste0("^ars_", yr, "_n_noise_postprocessed_"),
                        names(ars_wide), value = TRUE)
    pop_total <- rowSums(ars_wide[, ars_pp_cols, drop = FALSE], na.rm = TRUE)

    race_pats <- c(
      white    = "_White(?:_[MF])?$",
      black    = "_Black(?:_[MF])?$",
      hispanic = "_Hispanic(?:_[MF])?$",
      asian    = "_Asian(?:_[MF])?$",
      aian     = "_AIAN(?:_[MF])?$"
    )

    ars_summary <- ars_wide |>
      select(eif_lat, eif_lon) |>
      mutate(!!paste0("pop_total_", yr) := pop_total)

    for (race in names(race_pats)) {
      cols     <- ars_pp_cols[grepl(race_pats[[race]], ars_pp_cols, perl = TRUE)]
      race_pop <- if (length(cols) > 0)
        rowSums(ars_wide[, cols, drop = FALSE], na.rm = TRUE)
      else
        rep(NA_real_, nrow(ars_wide))
      ars_summary[[paste0("pct_", race, "_", yr)]] <-
        ifelse(pop_total > 0, race_pop / pop_total, NA_real_)
    }

    rm(ars_wide); gc()

    # --- raceincome ---
    ri_raw <- arrow::open_dataset(
      file.path(out_dir, sprintf("gridded_eif_pop_raceincome_%d.parquet", yr))
    ) |>
      dplyr::filter(
        as.numeric(grid_lat) >= NC_BBOX$lat_min,
        as.numeric(grid_lat) <= NC_BBOX$lat_max,
        as.numeric(grid_lon) >= NC_BBOX$lon_min,
        as.numeric(grid_lon) <= NC_BBOX$lon_max
      ) |>
      dplyr::collect() |>
      dplyr::mutate(
        eif_lat = round(as.numeric(grid_lat), 3),
        eif_lon = round(as.numeric(grid_lon), 3)
      ) |>
      dplyr::select(-grid_lat, -grid_lon)

    ri_wide <- aggregate_eif_wide(ri_raw, paste0("ri_", yr))
    rm(ri_raw); gc()

    ri_pp_cols <- grep(paste0("^ri_", yr, "_n_noise_postprocessed_"),
                       names(ri_wide), value = TRUE)
    ri_total  <- rowSums(ri_wide[, ri_pp_cols, drop = FALSE], na.rm = TRUE)
    low_cols  <- ri_pp_cols[grepl("_postprocessed_1_",  ri_pp_cols, fixed = TRUE)]
    high_cols <- ri_pp_cols[grepl("_postprocessed_10_", ri_pp_cols, fixed = TRUE)]

    ri_summary <- ri_wide |>
      select(eif_lat, eif_lon) |>
      mutate(
        !!paste0("pct_low_income_",  yr) :=
          ifelse(ri_total > 0,
                 rowSums(ri_wide[, low_cols,  drop = FALSE], na.rm = TRUE) / ri_total,
                 NA_real_),
        !!paste0("pct_high_income_", yr) :=
          ifelse(ri_total > 0,
                 rowSums(ri_wide[, high_cols, drop = FALSE], na.rm = TRUE) / ri_total,
                 NA_real_)
      )

    rm(ri_wide); gc()

    cell_summary <- cell_summary |>
      left_join(ars_summary, by = c("eif_lat", "eif_lon")) |>
      left_join(ri_summary,  by = c("eif_lat", "eif_lon"))
  }

  cell_summary
}

eif_cells_summary <- build_full_nc_cell_summary()

write_csv(eif_cells_summary, file.path(out_dir, "eif_cells_summary.csv"))

cat("eif_cells_summary.csv written:\n")
cat("  Total NC cells:        ", nrow(eif_cells_summary), "\n")
cat("  Columns:               ", ncol(eif_cells_summary), "\n")
cat("  Non-NA pop_total_2020: ",
    sum(!is.na(eif_cells_summary$pop_total_2020)), "\n")
cat("  Non-NA pct_black_2020: ",
    sum(!is.na(eif_cells_summary$pct_black_2020)), "\n")
```

