---
title: "Step 7: Flood Event & Eligibility Join (from existing Python CSVs)"
author: "Russell Blessing"
format:
  html:
    toc: true
    toc-depth: 3
    code-fold: true
    code-summary: "Show / hide code"
execute:
  echo: true
  warning: false
  message: false
---

## Overview

This notebook bridges Helena's pre-existing flood event data 
onto R's `parcel_index`, producing per-parcel flood and eligibility counts that
match the Julian's reported 67.5% eligibility ratio.

**Data source decision.** We use the pre-aggregated `parcels_events.csv`
produced by Julian and then crosswalked to R's `parcel_index`
via `(cntyfips, parno)`. 

**Inputs:**

- `parcels_pri.gpkg` — R's parcels (from Step 5)
- `/proj/.../Raw Data Inputs/parcels_study_area.csv` — Python's parcel index
  registry, used to build the `(cntyfips, parno)` crosswalk
- `/proj/.../Intermediary Data/parcels_events.csv` — pre-aggregated flood and
  eligibility columns per Python `parcel_index`, from Julian 

**Outputs (written to `out_dir`):**

- `parcel_crosswalk_R_Py.csv` — Python `parcel_index` ↔ R `parcel_index`
  crosswalk for future reference
- `parcels_events_R.csv` — final per-parcel flood and eligibility table,
  keyed on R's `parcel_index`, ready to join to `parcels_pri.gpkg`

## Methodology

**Why a crosswalk is necessary.** Python's `parcel_index` was created as the
row position in pandas's read of the source NC1 file, after Python-specific
filtering and deduplication. R's `parcel_index` was created independently
during the R Step 1 pipeline. The two sets diverge: R has ~29k
more parcels and same numeric indices refer to different parcels.

**Crosswalk key choice.** `(cntyfips, parno)` is the most stable real-world
parcel identifier across both pipelines. This uses a normalized `parno`
(alphanumeric only, matching Step 3's `strip_alnum`) for the primary
crosswalk and `altparno` as a fallback for R parcels that didn't match on
`parno`.

**Flood and eligibility columns.** The Original `parcels_events.csv`
already contains pre-computed `flooded` and `eligible` counts (and per-event
`_max` columns) following Python Step 12's logic. This translates these directly
to R's `parcel_index` space via the crosswalk.


```{r libraries}
library(sf)
library(dplyr)
library(readr)
library(stringr)
library(tidyr)
library(knitr)
```

```{r setup}
out_dir            <- "/proj/mhinolab/users/rbless/data/Obstacles_Output"
parcels_path       <- file.path(out_dir, "parcels_pri.gpkg")

py_parcels_csv     <- "/proj/mhinolab/projects/obstacles2/CRC_May_2025/Raw Data Inputs/parcels_study_area.csv"
aggregated_events_path <- "/proj/mhinolab/projects/obstacles2/CRC_May_2025/Intermediary Data/parcels_events.csv"

xwalk_out_path  <- file.path(out_dir, "parcel_crosswalk_R_Py.csv")
events_out_path <- file.path(out_dir, "parcels_events_R.csv")

# Helper: alphanumeric-only normalization, matches Step 3's strip_alnum
strip_alnum <- function(x) str_replace_all(replace_na(x, ""), "[^a-zA-Z0-9]", "")
```

## 1  Build the R ↔ Python parcel crosswalk

Python's aggregated event CSV is keyed on Python's `parcel_index`. We need to
translate to R's `parcel_index` so the flood data can be joined back to
`parcels_pri.gpkg`. The join key is `(cntyfips, parno)`, normalized
alphanumerically to handle formatting differences.

```{r build-crosswalk}
# Load R parcels
r_parcels <- sf::st_read(parcels_path, quiet = TRUE) |>
  sf::st_drop_geometry() |>
  dplyr::select(r_parcel_index = parcel_index, cntyfips, parno, altparno, fr_1_4) |>
  dplyr::mutate(
    cntyfips      = as.character(cntyfips),
    parno_norm    = strip_alnum(parno),
    altparno_norm = strip_alnum(altparno)
  )

# Load Python parcels registry
py_parcels <- read_csv(py_parcels_csv,
                       col_types = cols(.default = col_character())) |>
  dplyr::select(py_parcel_index = parcel_index, cntyfips, parno, altparno) |>
  dplyr::mutate(
    py_parcel_index = as.integer(py_parcel_index),
    cntyfips        = as.character(cntyfips),
    parno_norm      = strip_alnum(parno),
    altparno_norm   = strip_alnum(altparno)
  )

# Normalize cntyfips: Python may store "063", R uses "37063". Add prefix if needed.
if (!any(str_detect(py_parcels$cntyfips, "^37"), na.rm = TRUE)) {
  py_parcels <- py_parcels |> mutate(cntyfips = paste0("37", cntyfips))
}

cat("R parcels:    ", nrow(r_parcels),  "rows\n")
cat("Python parcels:", nrow(py_parcels), "rows\n")
cat("Sample R cntyfips: ", head(unique(r_parcels$cntyfips), 5),  "\n")
cat("Sample Py cntyfips:", head(unique(py_parcels$cntyfips), 5), "\n")
```

```{r crosswalk-primary}
# Primary crosswalk: (cntyfips, parno_norm)
xwalk_primary <- r_parcels |>
  filter(parno_norm != "") |>
  inner_join(
    py_parcels |> filter(parno_norm != "") |>
      select(py_parcel_index, cntyfips, parno_norm),
    by = c("cntyfips", "parno_norm"),
    relationship = "many-to-many"
  ) |>
  # If multiple Python parcels match one R parcel, take the first
  group_by(r_parcel_index) |> slice(1) |> ungroup() |>
  select(r_parcel_index, py_parcel_index, cntyfips) |>
  mutate(match_method = "parno")

cat("Primary crosswalk (parno):", nrow(xwalk_primary), "matches\n")
```

```{r crosswalk-secondary}
# Secondary crosswalk: altparno fallback for R parcels not matched via parno
r_unmatched <- r_parcels |>
  anti_join(xwalk_primary |> select(r_parcel_index), by = "r_parcel_index") |>
  filter(altparno_norm != "")

xwalk_secondary <- r_unmatched |>
  inner_join(
    py_parcels |> filter(altparno_norm != "") |>
      select(py_parcel_index, cntyfips, altparno_norm),
    by = c("cntyfips", "altparno_norm"),
    relationship = "many-to-many"
  ) |>
  group_by(r_parcel_index) |> slice(1) |> ungroup() |>
  select(r_parcel_index, py_parcel_index, cntyfips) |>
  mutate(match_method = "altparno")

cat("Secondary crosswalk (altparno):", nrow(xwalk_secondary), "matches\n")
```

```{r crosswalk-combine-write}
xwalk <- bind_rows(xwalk_primary, xwalk_secondary)

cat("\nFinal crosswalk:\n")
cat("  Total R parcels:                ", nrow(r_parcels), "\n")
cat("  R parcels with Py counterpart:  ", nrow(xwalk), "\n")
cat("  Coverage:                       ",
    sprintf("%.1f%%", 100 * nrow(xwalk) / nrow(r_parcels)), "\n\n")
xwalk |> count(match_method) |> print()

# Write crosswalk for reference / debugging
write_csv(xwalk, xwalk_out_path)
cat("\nCrosswalk saved to:", xwalk_out_path, "\n")
```

## 2  Crosswalk the Original aggregated event data to R parcel space

Read `parcels_events.csv` and translate its
Python `parcel_index` keys to R's `parcel_index` via the crosswalk built
above. The `flooded` and `eligible` columns in this file are pre-computed
and used as-is.

```{r read-aggregated-events}
py_events_aggregated <- read_csv(
  aggregated_events_path,
  col_types = cols(.default = col_character())
) |>
  mutate(parcel_index = as.integer(parcel_index))

post_merge <- xwalk |>
  select(r_parcel_index, py_parcel_index) |>
  inner_join(py_events_aggregated, by = c("py_parcel_index" = "parcel_index")) |>
  select(-py_parcel_index) |>
  left_join(
    r_parcels |> select(r_parcel_index, cntyfips, fr_1_4),
    by = "r_parcel_index"
  )

cat("py_events_aggregated rows: ", nrow(py_events_aggregated), "\n")
cat("post_merge rows:           ", nrow(post_merge), "\n")
cat("post_merge cols:           ", ncol(post_merge), "\n")
cat("Crosswalk match rate:      ",
    sprintf("%.1f%%", 100 * nrow(post_merge) / nrow(py_events_aggregated)), "\n")
```

## 3  Sanity check vs paper

```{r sanity-check}
# Coerce flooded / eligible to numeric — they were read as character
post_merge <- post_merge |>
  mutate(
    flooded  = as.integer(flooded_any),
    eligible = as.integer(eligible)
  )

flooded_n  <- sum(post_merge$flooded  > 0, na.rm = TRUE)
eligible_n <- sum(post_merge$eligible > 0, na.rm = TRUE)

# Residential-only subset (fr_1_4 == "1" in the Original CSV)
res_rows   <- post_merge |> filter(as.integer(fr_1_4) == 1)
res_flooded_n  <- sum(res_rows$flooded_any  > 0, na.rm = TRUE)
res_eligible_n <- sum(res_rows$eligible > 0, na.rm = TRUE)

sanity_tbl <- tibble(
  Metric = c(
    "Total parcels (crosswalked)",
    "Parcels flooded ≥ 1 time",
    "Parcels eligible ≥ 1 declared event",
    "Eligible / Flooded — all parcels",
    "Eligible / Flooded — residential only (fr_1_4 == 1)",
    "Paper reports"
  ),
  Value = c(
    scales::comma(nrow(post_merge)),
    scales::comma(flooded_n),
    scales::comma(eligible_n),
    sprintf("%.1f%%", 100 * eligible_n / flooded_n),
    sprintf("%.1f%%", 100 * res_eligible_n / res_flooded_n),
    "67.5%"
  )
)

knitr::kable(sanity_tbl, caption = "Sanity check: eligibility rate vs paper claim")
```

Expected: all-parcel ratio ~69.0%, residential ~69.4%. If either ratio is
far from these values, check that `aggregated_events_path` points to the correct
parcel_events file.

## 4  Write the final parcel-event table

```{r write-final}
write_csv(post_merge, events_out_path)

cat("Wrote:", events_out_path, "\n")
cat("Final dimensions:", nrow(post_merge), "rows ×",
    ncol(post_merge), "cols\n")
```

## 5  Join straight into `parcels_pri.gpkg`

This creates a single file with parcels + flood data combined, ready for
downstream analysis:

```{r join-to-parcels}
#| eval: false

# Load R parcels, join flood data, write combined output
parcels_pri <- sf::st_read(parcels_path, quiet = TRUE)

flood_data <- read_csv(events_out_path,
                       col_types = cols(.default = col_character())) |>
  mutate(
    r_parcel_index = as.integer(r_parcel_index),
    flooded        = as.integer(flooded),
    eligible       = as.integer(eligible)
  ) |>
  rename(parcel_index = r_parcel_index) |>
  select(-cntyfips)   # already in parcels_pri

parcels_with_floods <- parcels_pri |>
  left_join(flood_data, by = "parcel_index")

sf::st_write(parcels_with_floods,
             file.path(out_dir, "parcels_pri_with_floods.gpkg"),
             delete_dsn = TRUE, quiet = TRUE)

cat("Wrote: parcels_pri_with_floods.gpkg with",
    sum(!is.na(parcels_with_floods$flooded) & as.integer(parcels_with_floods$flooded) > 0),
    "parcels flagged as flooded\n")
```

## 6  Build the master parcel dataset

Single per-parcel table consolidating everything Step 8 needs:
flood + eligibility (Section 5 above), interpolation + value (Step 6),
applied/funded flags (Step 5 priority outputs), centroid coords + EIF
cell key, and per-cell EIF demographics (Step 6's `eif_cells_summary.csv`).

**Prerequisite.** Run the Step 6 per-cell summary chunk first so
`eif_cells_summary.csv` exists in `out_dir`. Without it, the cell
demographics join silently produces NAs.

**Inputs:**

- `parcels_pri_with_floods.gpkg` — Section 5 above
- `interpolated_parcels_final.csv` — Step 6
- `apps_pcls.gpkg`, `mits_pcls.gpkg` — Step 5 priority outputs
- `eif_cells_summary.csv` — Step 6 per-cell summary

**Output (written to `out_dir`):**

- `parcels_master.gpkg` — one row per parcel, all downstream analytical
  fields joined and resolved

### 6.1  Setup

```{r master-setup}
interp_path    <- file.path(out_dir, "interpolated_parcels_final.csv")
apps_path      <- file.path(out_dir, "apps_pcls.gpkg")
mits_path      <- file.path(out_dir, "mits_pcls.gpkg")
floods_path    <- file.path(out_dir, "parcels_pri_with_floods.gpkg")
eif_cells_path <- file.path(out_dir, "eif_cells_summary.csv")
master_path    <- file.path(out_dir, "parcels_master.gpkg")
```

### 6.2  Parcel centroids + EIF cell key

Centroids in WGS84 are the canonical parcel location. CoreLogic's
`PARCEL.LEVEL.LATITUDE` / `LONGITUDE` are not used — they have ~34%
missingness for late-pipeline parcels (completed buyouts, voided
addresses) and known precision issues. The EIF cell key uses Step 6's
convention (cell-center, 3-decimal rounded) so the join to
`eif_cells_summary.csv` works directly.

```{r master-centroids}
parcels_sf <- st_read(floods_path, quiet = TRUE)

centroids <- suppressWarnings(
  parcels_sf |>
    st_transform(4326) |>
    st_centroid() |>
    st_coordinates() |>
    as_tibble() |>
    rename(cent_lng = X, cent_lat = Y)
) |>
  mutate(
    eif_lon = round(floor(cent_lng * 100) / 100 + 0.005, 3),
    eif_lat = round(floor(cent_lat * 100) / 100 + 0.005, 3)
  )

cat("Centroids computed:", nrow(centroids), "\n")
```

### 6.3  Resolve duplicate residential flag

Section 5's join leaves `fr_1_4.x` (from parcels_pri) and `fr_1_4.y`
(from parcels_events.csv). Collapse to one `fr_1_4` column.

```{r master-dedup-fr14}
parcels_attr <- parcels_sf |>
  st_drop_geometry() |>
  bind_cols(centroids) |>
  mutate(fr_1_4 = coalesce(
    suppressWarnings(as.integer(fr_1_4.x)),
    suppressWarnings(as.integer(fr_1_4.y))
  )) |>
  select(-any_of(c("fr_1_4.x", "fr_1_4.y")))
```

### 6.4  Load Step 6 interpolation + Step 5 applied/funded flags

```{r master-load-interp-flags}
# Interpolation flag, prop_value_5, mit/app counts, gov_owned (Step 6)
interp <- read_csv(
  interp_path,
  col_select = c("parcel_index", "interpolate", "prop_value_5",
                 "mit_count", "app_count", "gov_owned"),
  col_types  = cols(.default = col_character()),
  show_col_types = FALSE
) |>
  transmute(
    parcel_index = as.integer(parcel_index),
    interpolate  = suppressWarnings(as.integer(interpolate)),
    prop_value_5 = suppressWarnings(as.numeric(prop_value_5)),
    mit_count    = suppressWarnings(as.integer(mit_count)),
    app_count    = suppressWarnings(as.integer(app_count)),
    gov_owned    = suppressWarnings(as.integer(gov_owned))
  )

# Applied / funded flags from Step 5 priority outputs
applied_set <- st_read(apps_path, quiet = TRUE) |>
  st_drop_geometry() |>
  distinct(parcel_index) |>
  mutate(parcel_index = as.integer(parcel_index), applied = TRUE)

funded_set <- st_read(mits_path, quiet = TRUE) |>
  st_drop_geometry() |>
  distinct(parcel_index) |>
  mutate(parcel_index = as.integer(parcel_index), funded = TRUE)

# EIF cell demographics (Step 6 per-cell summary)
eif_cells <- read_csv(eif_cells_path, show_col_types = FALSE)
```

### 6.5  Assemble the master file

```{r master-assemble}
master <- parcels_attr |>
  mutate(parcel_index = as.integer(parcel_index)) |>
  left_join(interp,      by = "parcel_index") |>
  left_join(applied_set, by = "parcel_index") |>
  left_join(funded_set,  by = "parcel_index") |>
  left_join(eif_cells,   by = c("eif_lat", "eif_lon")) |>
  mutate(
    interpolate = coalesce(interpolate, 0L),
    mit_count   = coalesce(mit_count,   0L),
    app_count   = coalesce(app_count,   0L),
    gov_owned   = coalesce(gov_owned,   0L),
    applied     = coalesce(applied,     FALSE),
    funded      = coalesce(funded,      FALSE),
    # Final value: prop_value_5 for buyout candidates, TVC otherwise.
    # prop_value_5 already falls back to TVC for non-recipients, so this
    # collapses to just `value = prop_value_5` — kept conditional for clarity.
    value = if_else(
      interpolate == 1L,
      prop_value_5,
      suppressWarnings(as.numeric(TOTAL.VALUE.CALCULATED))
    )
  ) |>
  # Re-attach geometry from parcels_sf (same row order)
  bind_cols(geometry = st_geometry(parcels_sf)) |>
  st_as_sf(crs = st_crs(parcels_sf))
```

### 6.6  Sanity check

```{r master-sanity}
cat("master parcels:", format(nrow(master), big.mark = ","), "\n")
cat("  fr_1_4 == 1:     ",
    format(sum(master$fr_1_4 == 1L,  na.rm = TRUE), big.mark = ","), "\n")
cat("  interpolate==1:  ",
    format(sum(master$interpolate == 1L, na.rm = TRUE), big.mark = ","), "\n")
cat("  applied:         ",
    format(sum(master$applied,  na.rm = TRUE), big.mark = ","), "\n")
cat("  funded:          ",
    format(sum(master$funded,   na.rm = TRUE), big.mark = ","), "\n")
cat("  centroid coords valid: ",
    format(sum(!is.na(master$cent_lat) & !is.na(master$cent_lng)), big.mark = ","), "\n")
cat("  matched to EIF demographics: ",
    format(sum(!is.na(master$pct_white_2020)), big.mark = ","), "of",
    format(nrow(master), big.mark = ","), "\n")
```

### 6.7  Write

```{r master-write}
st_write(master, master_path, delete_dsn = TRUE, quiet = TRUE)
cat("Wrote:", master_path, "—", ncol(master), "cols\n")
```

## Output schema — `parcels_master.gpkg`

One row per parcel; downstream consumers (Step 8 figures) read only this file.

| Column group | Columns | Source |
|---|---|---|
| Identifier | `parcel_index`, `cntyfips`, `parno`, `altparno`, `siteadd`, `SITUS.CITY` | parcels_pri (Step 5) |
| Geometry | parcel polygon + `cent_lat`, `cent_lng` | computed centroid (this step) |
| EIF cell key | `eif_lat`, `eif_lon` | derived from centroid, Step 6 convention |
| LUC | `LAND.USE.CODE`, `vacant`, `gov_lu`, `res_lu`, `fr_1_4` (deduped) | parcels_pri |
| Priority | `luc_tier`, `qual_sum`, `match_priority` | parcels_pri |
| Flood | `flooded_any`, `eligible` | Section 5 above |
| Interpolation | `interpolate`, `prop_value_5`, `mit_count`, `app_count`, `gov_owned` | Step 6 |
| Final value | `value` (resolved: prop_value_5 if interpolate else TVC) | derived (this step) |
| Pipeline flags | `applied`, `funded` (logicals) | Step 5 priority outputs |
| EIF demographics | `pop_total_{1999,2020}`, `pct_white_{1999,2020}`, `pct_black_{1999,2020}`, `pct_hispanic_{1999,2020}`, `pct_asian_{1999,2020}`, `pct_aian_{1999,2020}` | Step 6 per-cell summary |


