I'm working on a function to detect positive and negative events using a time series of cumulative anomalies. The function seems to work fine for vectors and produces the correct number of outputs as a named numeric vector. Now, I'm trying to apply this over a spatRaster using terra::app(), too big for here, but you can check it in this folder. The code runs for about 1 hour or two, but then this error message:
Not compatible with requested type: [type=list; target=double]
This is the function:
fixed_event_detection <- function(accum_anom,
rfd_vals,
dates,
max_events = 3,
max_gap_days = 60,
rfd_thr = 90,
output ,
#thr_gap = 0.05,
min_tud = 30) {
if (length(accum_anom) != length(dates) || length(accum_anom) != length(rfd_vals)) stop("Dates and anomalies must have the same length.")
if(output == 'all'){
out_length <- max_events * 13 + 2
}
if(output %in% c('positive', 'negative')){
out_length <- 13
}
if (all(is.na(accum_anom))) return(as.numeric(rep(NA, out_length)))
# initial df arrangement
df <- data.frame(dates = lubridate::ymd(dates), accum = as.numeric(accum_anom), rfd = as.numeric(rfd_vals)) %>%
na.omit() %>%
dplyr::arrange(dates) %>%
mutate(
day_diff = as.numeric(difftime(dates, dplyr::lag(dates), units = "days")),
sign = sign(accum),
sign_lag = dplyr::lag(sign),
gap = ifelse(is.na(day_diff), 0, day_diff),
rfd_extreme = ifelse(abs(rfd) >= rfd_thr, 1 * sign(rfd), 0),
# condition 1: simple sign change
change_sign = sign != sign_lag & !is.na(sign) & !is.na(sign_lag),
# condition 2: Na gap + sign change
change_sign_after_gap = gap > max_gap_days & change_sign,
# New event grouping conditions
new_event = change_sign | change_sign_after_gap,
# group events
grouped_anom = collapse::fcumsum(collapse::replace_na(new_event, FALSE))
)
# Filter values without polarity
events_raw <- df %>%
dplyr::group_by(grouped_anom) %>%
dplyr::arrange(dates) %>%
dplyr::group_split() %>% sapply(FUN = as.data.frame, simplify = F)
if (length(events_raw) == 0) return(as.numeric(rep(NA, out_length)))
# Calculate event metrics
events <- purrr::map_dfr(events_raw, function(df_evt) {
total <- df_evt %>%
filter(dates >= (base::min(dates) + months(6)) & dates <= (base::max(dates) + months(-6))) %>% count() %>% pull()
extreme_pos <- df_evt %>%
filter(dates >= (base::min(dates) + months(6)) & dates <= (base::max(dates) + months(-6)) & rfd_extreme == 1) %>%
summarise((n()*100)/total) %>% pull()
extreme_neg <- df_evt %>%
filter(dates >= (base::min(dates) + months(6)) & dates <= (base::max(dates) + months(-6)) & rfd_extreme == -1) %>%
summarise((n()*100)/total) %>% pull()
out_df <- data.frame(
start_date = as.integer(base::format(base::min(df_evt$dates), '%Y%m')),
end_date = as.integer(base::format(base::max(df_evt$dates), '%Y%m')),
auc = base::sum(abs(df_evt$accum), na.rm = TRUE),
auc_max = base::max(abs(df_evt$accum), na.rm = TRUE),
aux_max_date = as.integer(base::format(df_evt$dates[which.max(abs(df_evt$accum))], '%Y%m')),
months_to_max = lubridate::interval(base::min(df_evt$dates), df_evt$dates[which.max(abs(df_evt$accum))]) %/% months(1) + 1,
months_since_max = lubridate::interval(df_evt$dates[which.max(abs(df_evt$accum))], base::max(df_evt$dates)) %/% months(1) + 1,
polarity = sign(df_evt$accum[1]),
tud_days = as.integer(base::max(df_evt$dates) - base::min(df_evt$dates)) + 1,
tud_months = lubridate::interval(base::min(df_evt$dates), base::max(df_evt$dates)) %/% months(1) + 1,
extreme_neg = extreme_neg,
extreme_pos = extreme_pos
)
out_df
}) %>%
dplyr::arrange(start_date) %>%
dplyr::mutate(event_id = dplyr::row_number()) %>%
dplyr::filter(tud_days >= min_tud)
total_pos <- base::sum(events$polarity == 1)
total_neg <- base::sum(events$polarity == -1)
# Take events
if(output == 'all'){
events <- events %>% dplyr::slice_head(n = max_events)
n_rep <- base::nrow(events)
# to vector
vec_events <- events %>% base::t()
name_vec <- vec_events %>% row.names() %>% base::rep(times = n_rep)
vec_events2 <- vec_events %>% as.vector() %>% round(2)
# fill missing events
missing_val <- (max_events * 13) - length(vec_events2)
if (missing_val > 0) vec_events2 <- c(vec_events2, as.vector(rep(NA, missing_val)))
# add total events both pos and neg
out_events <- c(total_pos, total_neg, vec_events2)
names(out_events) <- c('total_pos', 'total_neg', name_vec, rep('not_flagged', missing_val))
}
if(output == 'positive'){
events <- events %>% filter(polarity == 1) %>%
dplyr::slice_max(order_by = auc, n = 1)
if (nrow(events) == 0) return(as.numeric(rep(NA, out_length)))
vec_events <- events %>% t()
name_vec <- vec_events %>% row.names()
out_events <- vec_events %>% as.vector() %>% round(2)
names(out_events) <- name_vec
}
if(output == 'negative') {
events <- events %>% filter(polarity == -1) %>%
dplyr::slice_max(order_by = auc, n = 1)
if (nrow(events) == 0) return(as.numeric(rep(NA, out_length)))
vec_events <- events %>% base::t()
name_vec <- vec_events %>% row.names()
out_events <- vec_events %>% as.vector() %>% round(2)
names(out_events) <- name_vec
}
return(out_events)
}
and this is how I am applying the code:
spat_detection <- c(spat_accum, spat_rfd_pol) # accumulated anomalies + rfd brick with positive or negative signs
### setup ----
dates <- names(spat_accum) %>%
str_extract('\\d{4}-\\d{2}-\\d{2}') %>%
ymd()
max_events <- 20
max_gap_days <- 60
min_tud <- 30 # express in days
output <- 'negative'
rfd_thr <- 90 # extreme values characterization for events
outname <- paste0('05_TUD_events/', variable, '_negative_events_', ifelse(variable == 'ET', 'MODIS16A2GF_v2.tif', 'MCD43A4_v2.tif'))
### event detection and TUD metrics ----
library(parallel)
ncores <- detectCores() - 6
cl <- makeCluster(ncores)
clusterExport(cl, c('fixed_event_detection','dates',
'max_events', 'max_gap_days',
'rfd_thr', 'min_tud', 'output'))
# Cargar paquetes en los workers
clusterEvalQ(cl, {
library(terra)
library(dplyr)
library(lubridate)
library(magrittr)
library(purrr)
library(collapse)
})
system.time(spat_events <- app(spat_detection, fun = function(x) {
# ET 1:1104 anom accum spatRaster; 1105:2208 RFD spatRaster
# Call vector function
fixed_event_detection(
accum_anom = as.numeric(x[1:1104]),
rfd_vals = as.numeric(x[1105:2208]),
output = output,
rfd_thr = rfd_thr,
dates = dates,
max_events = max_events,
max_gap_days = max_gap_days,
min_tud = min_tud
)
}, cores = cl, filename = outname, overwrite = T, wopt = list(datatype = 'INT4S')))
stopCluster(cl)
The output should be a a named numeric vector with 13 values if you work with output = 'positive' or output = 'negative'. But will vary based on the max_events argument for output = 'all'.
In a spatRaster it should be a 13 layer file. I have checked the outputs for single vectors for the output lengths with no major issues.
Thanks in advance for the help!
fixed_event_detectionreturns a differently shaped result (e.g. shorter vector) if the input includes missing values.