---
title: "Step 4: Nearest-Neighbor Spatial Join for Unmatched Parcels"
author: "Russell Blessing"
format:
  html:
    toc: true
    toc-depth: 3
    code-fold: true
    code-summary: "Show / hide code"
execute:
  echo: true
  warning: false
  message: false
---

## Overview

This notebook performs a nearest-neighbor spatial join for parcels that did
not receive a string match in Step 3. For each county, the unmatched OneMap
parcel geometries are joined to the nearest CoreLogic point. Ties (a parcel
equidistant from multiple CL points) are resolved by Levenshtein address
similarity. The final output combines the string-matched and distance-matched
records into a single parcel–CoreLogic table.

```{r libraries}
library(sf)
library(dplyr)
library(readr)
library(stringr)
library(purrr)
library(stringdist)  # for levenshtein distance calculation
library(gt)
library(tidyr)
```

```{r setup}
#| eval: true
out_dir <- "/proj/mhinolab/users/rbless/data/Obstacles_Output"

# CoreLogic (all character)
cl <- read_csv(
  file.path(out_dir, "cl.csv"),
  col_types = cols(.default = col_character())
)

# Previously string-matched parcel indices
stringmatched <- read_csv(
  file.path(out_dir, "parcel_cl_stringmatch.csv"),
  col_types = cols(.default = col_character())
) |>
  mutate(parcel_index = as.character(parcel_index))

# Read spatial parcels (keep geometry for the join)
nc1_attr <- st_read(
  file.path(out_dir, "parcels_study_area.gpkg"),
  query = "SELECT parcel_index, cntyfips FROM parcels_study_area",
  quiet = TRUE
) |>
  st_drop_geometry() |>
  mutate(
    parcel_index = as.character(parcel_index),
    cntyfips     = paste0("37", cntyfips)
  )

unmatched_ids <- nc1_attr |>
  filter(!parcel_index %in% stringmatched$parcel_index) |>
  pull(parcel_index)

# Read only unmatched rows with geometry using a SQL filter
unmatched_nc1 <- st_read(
  file.path(out_dir, "parcels_study_area.gpkg"),
  query = paste0(
    "SELECT * FROM parcels_study_area WHERE parcel_index IN (",
    paste(unmatched_ids, collapse = ","),
    ")"
  ),
  quiet = TRUE
) |>
  mutate(
    parcel_index = as.character(parcel_index),
    cntyfips     = paste0("37", cntyfips)
  )

rm(nc1_attr); gc()

county_list <- unique(unmatched_nc1$cntyfips)

# Build sf object for CL records in counties that have unmatched parcels
cl_unmatched <- cl |>
  filter(`FIPS CODE` %in% county_list) |>
  rename(fips_code = `FIPS CODE`) |> 
  filter(!is.na(`PARCEL LEVEL LATITUDE`) & !is.na(`PARCEL LEVEL LONGITUDE`)) |>
  st_as_sf(
    coords = c("PARCEL LEVEL LONGITUDE", "PARCEL LEVEL LATITUDE"),
    crs = 4326
  )
```


```{r reproject}
#| eval: true
# Reproject to NC State Plane (meters) for distance calculations
unmatched_nc1  <- st_transform(unmatched_nc1,  crs = 32119)
cl_unmatched   <- st_transform(cl_unmatched,   crs = st_crs(unmatched_nc1))

# Assign geometry IDs (factorize unique geometries)
unmatched_nc1 <- unmatched_nc1 |>
  mutate(geometry_id = as.integer(factor(as.character(st_geometry(unmatched_nc1)))))

cl_unmatched <- cl_unmatched |>
  mutate(geometry_id1 = as.integer(factor(as.character(st_geometry(cl_unmatched)))))
```

```{r save-intermediates}
#| eval: true
# Save intermediate geopackages
st_write(unmatched_nc1, file.path(out_dir, "nc1geoms_distmatch.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)

st_write(cl_unmatched, file.path(out_dir, "cl_wID.gpkg"),
         delete_dsn = TRUE, quiet = TRUE)
```

```{r load-intermediates}
out_dir <- "/proj/mhinolab/users/rbless/data/Obstacles_Output"

unmatched_nc1 <- st_read(file.path(out_dir, "nc1geoms_distmatch.gpkg"), quiet = TRUE) |>
  mutate(parcel_index = as.character(parcel_index))

cl_unmatched <- st_read(file.path(out_dir, "cl_wID.gpkg"), quiet = TRUE)

county_list <- unique(unmatched_nc1$cntyfips)
```

```{r nearest-neighbor-join}
#| eval: true
# Drop to unique CL geometries before joining (avoids redundant distance calcs)
cl_unique <- cl_unmatched |>
  distinct(geometry_id1, .keep_all = TRUE)

# County-by-county nearest-neighbor join
nn_results <- map(county_list, function(fips) {
  cl_fips      <- cl_unique      |> filter(`fips_code` == fips)
  parcels_fips <- unmatched_nc1  |> filter(cntyfips    == fips)

  if (nrow(cl_fips) == 0 || nrow(parcels_fips) == 0) return(NULL)

  # Nearest-neighbour join (assigns each parcel its nearest CL row)
  nn <- st_join(parcels_fips, cl_fips, join = st_nearest_feature) |>
    filter(!is.na(geometry_id1))

  if (nrow(nn) == 0) return(NULL)

  # Get CL point geometries for matched rows
  idx           <- match(nn$geometry_id1, cl_fips$geometry_id1)
  matched_geoms <- st_geometry(cl_fips)[idx]

  nn <- nn |>
    mutate(distance = as.numeric(
      st_distance(st_geometry(nn), matched_geoms, by_element = TRUE)
    ))

  # For each parcel, keep only the minimum-distance match(es)
  nn |>
    st_drop_geometry() |>
    group_by(parcel_index) |>
    filter(distance == min(distance)) |>
    ungroup()
}, .progress = TRUE) |>
  list_rbind()
```

```{r identify-ties}
#| eval: true
# Parcels that matched multiple CL geometries at equal minimum distance
tie_counts      <- count(nn_results, parcel_index)
tie_parcelids   <- tie_counts$parcel_index[tie_counts$n > 1]

duplicated_rows <- nn_results |> filter(parcel_index %in%  tie_parcelids)
cl_dist_nosort  <- nn_results |> filter(!parcel_index %in% tie_parcelids)

write_csv(duplicated_rows, file.path(out_dir, "parcel_joins_to_sort.csv"))
write_csv(cl_dist_nosort,  file.path(out_dir, "parcel_cl_dist_joins_nosort.csv"))
```

```{r resolve-ties}
#| eval: true

# CL attribute table (no geometry) for address lookup
cl_attr <- cl_unmatched |> st_drop_geometry()

# One row per tied parcel (the nc1 side)
nc1_pcls <- duplicated_rows |> distinct(parcel_index, .keep_all = TRUE)

set.seed(987)

find_best_cl_match <- function(pc_row) {
  pc       <- nc1_pcls[pc_row, ]
  cl_geoms <- duplicated_rows |>
    filter(parcel_index == pc$parcel_index) |>
    pull(geometry_id1)

  cl_pcls <- cl_attr |> filter(geometry_id1 %in% cl_geoms)

  # Choose address field: situs preferred, mailing as fallback
  if (!is.na(pc$siteadd) && pc$siteadd != "") {
    candidate_addresses <- paste(cl_pcls$`SITUS.HOUSE.NUMBER`,
                                 cl_pcls$`SITUS.STREET.NAME`)
    query_address       <- pc$siteadd
  } else {
    candidate_addresses <- paste(cl_pcls$`MAILING.HOUSE.NUMBER`,
                                 cl_pcls$`MAILING.STREET.NAME`)
    query_address       <- pc$mailadd
  }

  scores    <- stringdist(as.character(query_address), candidate_addresses,
                          method = "lv")
  best_idx  <- which(scores == min(scores))

  # Break remaining ties randomly
  if (length(best_idx) > 1) best_idx <- sample(best_idx, 1)

  cl_pcls[best_idx, ]
}

matched_cls  <- map(seq_len(nrow(nc1_pcls)), find_best_cl_match)
cl_dist_sorted <- bind_cols(
  nc1_pcls |> select(-any_of(names(cl_attr))),   # nc1 columns only
  list_rbind(matched_cls)                          # winning CL row
)

write_csv(cl_dist_sorted, file.path(out_dir, "cl_dist_sorted.csv"))
```

```{r load-csvs}
cl_string <- read_csv(
  file.path(out_dir, "parcel_cl_stringmatch.csv"),
  col_types = cols(.default = col_character())
)

cl_dist_nosort <- read_csv(
  file.path(out_dir, "parcel_cl_dist_joins_nosort.csv"),
  col_types = cols(.default = col_character())
)

cl_dist_sorted <- read_csv(
  file.path(out_dir, "cl_dist_sorted.csv"),
  col_types = cols(.default = col_character())
)
```

```{r combine-all}
#| eval: true

# Step 3 writes CL attributes with spaced names (`PROPERTY INDICATOR CODE`),
# while Step 4 writes them with dotted names because the CL attributes
# round-tripped through GeoPackage (GDAL replaces spaces with dots). Without
# normalizing, bind_rows() would create paired columns ("X Y" and "X.Y"), each
# half-populated. Force everything to the dotted form before stacking.
to_dotted <- function(df) {
  rename_with(df, ~ gsub(" ", ".", .x, fixed = TRUE))
}

# Stack string-matched, distance-matched (no sort needed), distance-matched (sorted)
pcl_comb <- bind_rows(
    cl_string      |> to_dotted() |> mutate(across(everything(), as.character)),
    cl_dist_nosort |> to_dotted() |> mutate(across(everything(), as.character)),
    cl_dist_sorted |> to_dotted() |> mutate(across(everything(), as.character))
  ) |>
  mutate(geometry_id = ifelse(is.na(geometry_id), "", as.character(geometry_id))) |>
  arrange(parcel_index, geometry_id) |>                # blank geometry_id sorts first
  distinct(parcel_index, .keep_all = TRUE)             # prefer string match

# Sanity check: no spaced/dotted column duplication slipped through
stopifnot(
  length(intersect(
    gsub(" ", ".", names(pcl_comb)),
    names(pcl_comb)
  )) == ncol(pcl_comb)
)

write_csv(pcl_comb, file.path(out_dir, "nc1_cl_merge.csv"))
```

```{r read-combine-all}
pcl_comb <- read_csv(
  file.path(out_dir, "nc1_cl_merge.csv"),
  col_types = cols(.default = col_character())
)
```

## Summary

```{r summary-stats}
n_string   <- nrow(cl_string)
n_nosort   <- nrow(cl_dist_nosort)
n_sorted   <- nrow(cl_dist_sorted)
n_total    <- nrow(pcl_comb)
n_unmatched_input <- nrow(unmatched_nc1)
```

```{r summary-table}
tibble::tibble(
  Source  = c("String-matched (Step 3)", "Distance-matched (no ties)",
              "Distance-matched (ties resolved)", "Total unique parcels in merge"),
  Records = c(n_string, n_nosort, n_sorted, n_total)
) |>
  gt() |>
  tab_header(title = "Parcel–CoreLogic Merge Summary") |>
  fmt_integer(columns = Records) |>
  cols_label(Source = "", Records = "N")
```

- **`r scales::comma(n_unmatched_input)`** parcels entered the spatial join (no string match found in Step 3).
- **`r scales::comma(n_nosort)`** were unambiguously distance-matched.
- **`r scales::comma(n_sorted)`** had tied distances and were resolved by Levenshtein address similarity.
- The combined file `nc1_cl_merge.csv` contains **`r scales::comma(n_total)`** unique parcel records.

## One-to-Many CL Match Diagnostic

In a distance-based spatial join, it is expected that a single CoreLogic (CL)
point may be the nearest neighbor to multiple OneMap parcels — for example,
when a large tract has been subdivided, when many parcels lack site addresses,
or when a condominium building appears as one polygon in OneMap but has many
individual unit records in CL. 

The following tables describe the scope of this issue in the distance-matched
output.

```{r one-to-many-summary}
# Count how many OneMap parcels share each CL geometry_id1
cl_match_counts <- cl_dist_nosort |>
  count(geometry_id1, name = "n_parcels")

one_to_many_summary <- cl_match_counts |>
  summarise(
    `Total CL records matched`          = n(),
    `Matched to exactly 1 parcel`       = sum(n_parcels == 1),
    `Matched to 2–5 parcels`            = sum(n_parcels > 1 & n_parcels <= 5),
    `Matched to 6–20 parcels`           = sum(n_parcels > 5 & n_parcels <= 20),
    `Matched to >20 parcels`            = sum(n_parcels > 20),
    `Max parcels sharing one CL record` = max(n_parcels),
    `Parcels affected (n_parcels > 1)`  = sum(n_parcels[n_parcels > 1])
  ) |>
  pivot_longer(everything(), names_to = "Metric", values_to = "Value")

one_to_many_summary |>
  gt() |>
  tab_header(title = "Distance-Matched: One-to-Many CL Record Summary") |>
  fmt_integer(columns = Value) |>
  cols_label(Metric = "", Value = "N")
```

```{r one-to-many-by-type}
# Break down multi-matched parcels by CL property type
cl_dist_nosort |>
  inner_join(
    cl_match_counts |> filter(n_parcels > 1),
    by = "geometry_id1"
  ) |>
  distinct(geometry_id1, n_parcels, `PROPERTY.INDICATOR.CODE`,
           `LAND.USE.CODE`, `COUNTY.USE.DESCRIPTION`) |>
  count(`PROPERTY.INDICATOR.CODE`, `LAND.USE.CODE`, `COUNTY.USE.DESCRIPTION`,
        wt = n_parcels, name = "n_parcels_affected") |>
  arrange(desc(n_parcels_affected)) |>
  slice_head(n = 20) |>
  gt() |>
  tab_header(
    title    = "Top 20 CoreLogic Property Types with Multi-Parcel Matches",
    subtitle = "Weighted by number of OneMap parcels affected"
  ) |>
  fmt_integer(columns = n_parcels_affected) |>
  cols_label(
    `PROPERTY.INDICATOR.CODE` = "Property Indicator",
    `LAND.USE.CODE`            = "Land Use Code",
    `COUNTY.USE.DESCRIPTION`   = "County Use Description",
    n_parcels_affected         = "Parcels Affected"
  )
```

**Key findings:**

- The majority of CL records (**`r scales::comma(sum(cl_match_counts$n_parcels == 1))`**,
  `r scales::percent(mean(cl_match_counts$n_parcels == 1), accuracy = 0.1)`) match
  exactly one OneMap parcel — these are clean one-to-one matches.
- **`r scales::comma(sum(cl_match_counts$n_parcels[cl_match_counts$n_parcels > 1]))`**
  parcels share a CL record with at least one other parcel.
- Multi-match cases fall into three broad categories:
  - **Condominiums** (`PROPERTY.INDICATOR.CODE = 11`, `LAND.USE.CODE = 112`): One
    building polygon in OneMap is the nearest neighbor for many individual CL unit
    records. This is expected behavior.
  - **Single-family residential** (`PROPERTY.INDICATOR.CODE = 10`, `LAND.USE.CODE = 163`):
    A CL residential point is the nearest neighbor to multiple surrounding OneMap
    parcels (e.g., adjacent vacant lots, subdivided tracts with no site addresses).
    This is the primary risk for downstream application duplication.
  - **Vacant land** (`PROPERTY.INDICATOR.CODE = 80` or `10`, `LAND.USE.CODE = 400`
    or `100`): Similar to the residential case — many parcels cluster around a
    single CL vacant land point.
- A `n_cl_matches` flag column should be added in downstream joins to allow
  filtering to clean one-to-one matches when needed.