1

I am attempting to run a calculation on each row of a large dataset using the carcass package. Each row of the dataset contains the values to feed to the function, and a call number for an associated vector stored in a list object. I have created an apply() function to cycle through each row of the dataset and assemble a vector of the results that I can then append to the dataset.

The problem is, the calculation is very slow, and with 1000+ rows of data, it takes several hours to days to run. Looking for ways to speed this up, I am attempting to set up parallel processing using the parApply() functions, but it doesn't appear to be reading or including my list object or dataframe correctly.

What I've tried so far:

library(carcass)
library(doParallel)
library(parallel)

no_cores <- detectCores(logical = TRUE)
no_cores

cl <- makeCluster(no_cores-1)  
registerDoParallel(cl)  

test_data<-data.frame(No.=c(39,48,16,23,7,1),p=c(0.05,0.05,0.05,0.05,0.05,0.05),c=rep(0.708,6),ID=c(1,1,2,2,2,2))
I<-list(c(rep(c(7,10),8),7),c(rep(7,10)))

all_samples<- as.data.frame((test_data))
all_IDs<-I
seq_id_all<- seq_along(1:nrow(all_samples))

#Write function: high maxN
function_test <- function(row, I, maxN = 600000) {
  tryCatch({
    # Calculate main posterior estimate
    p_main <- ettersonEq14(s = row[3], f = row[2], J = I[[row[4]]])
    N_main <- posteriorN(p = p_main, nf = row[1], maxN = maxN, plot = FALSE)
    
    # Return all estimates as a named vector
    return(N_main = N_main$HT.estimate)
    
  }, error = function(e) {
    message("Error processing row: ", paste(row, collapse = ", "), " - ", e$message)
    return(c(N_main = NA))
  })
}

clusterExport(cl,list('function_test','all_samples','I'))

system.time(results <- c(parLapply(cl,all_samples,I=I,fun=function_test)))
stopCluster(cl)

What I got: a list object of each column in the original dataset, with an NA under each.

What I want: a vector of the 6 HT.estimates, one for each row.

How it works normally using the test data and function above:

library(pbapply)
results <- t(pbapply(test_data, 1, function_test, I = I))

I'm not sure where it's going wrong, and I'm new to both parallel processing and apply() functions in R; I've been banging my head against it for several hours now and gotten nowhere, so I figured I'd ask here. (I'm also new to StackExchange, so apologies if this is missing anything/formatted poorly.) Any suggestions would be appreciated.

Additional edits: I've added:

clusterCall(cl, function() library(carcass))

On the line under registerDoParallel(cl), and:

return(c(N_main=conditionMessage(e))

To the error function within the overall function. It is still printing all NAs, and is not printing an error message.

8
  • I suggest you include the error itself in the return value within the error=function(e) ... function, perhaps something like return(c(N_main=conditionMessage(e))). From there, you can see the actual error text and act accordingly. Commented Jun 28 at 21:04
  • Make sure to have each worker load the required packages (carcass): stackoverflow.com/questions/34022083/… Commented Jun 28 at 22:33
  • In the lines for p_main and N_main I can't tell why you are indexing the things you are indexing (e.g. row[8] when the df has 4 columns) Commented Jun 30 at 15:20
  • @Stephen Thanks for spotting that typo - it's a remnant from the full dataset, which has additional columns. I've fixed it for the test data. Commented Jul 2 at 18:10
  • @I_O Added the code suggested in that article but no change and no luck - I do think it's a package issue, any other suggestions to get that loaded properly? Commented Jul 2 at 18:11

2 Answers 2

3

I don't think you need parallelization here. The reason the function etterson14Eq takes so long is that it is written very inefficiently.

I wrote an efficient version that speeds up the computation for 1000 rows with 30 inspection periods each by a factor of 700.

Here it is:

It avoids the for loops. A large part of the matrix multiplications are redundant as we need only one element from most of the matrices. Also in some cases this element can be computed directly.

library(matrixStats)
ettersonEq14_new <- function(s, f, J) {
  n <- length(J); pd <- f
  q_r <- s; q_d <- 1 - pd

  J_mat <- matrix(J[-1], n-1, n-1)
  J_mat[upper.tri(J_mat)] <- 0

  survivedIntervals <- q_r^colCumsums(J_mat)
  survivedIntervals[upper.tri(survivedIntervals)] <- 0

  M <- matrix(1:(n-1), n-1,n-1)
  M<-M-col(M)
  M[M<0]<-0

  notFoundIntervals <- q_d^M
  survivedAndFound <-pd*(colSums(survivedIntervals*notFoundIntervals))

 gfe <- sum((1-q_r^J[-n])*(survivedAndFound*q_d+pd)/(1-s))*s + s*f*(1-s^J[n])/(1-s)
  gfe / sum(J)
}

The library matrixStats is required for the colCumsums function.

Here is an test run on my PC, showing that the two functions yield the results within a tolerance of sqrt(.Machine$double.eps) which is 1.490116e-08 in my case.

> library(carcass)
Loading required package: lme4
Loading required package: Matrix
Loading required package: survival
> library(matrixStats)
> ettersonEq14_new <- function(s, f, J) {
+   n <- length(J); pd <- f
+   q_r <- s; q_d <- 1 - pd
+   J_mat <- matrix(J[-1], n-1, n-1)
+   J_mat[upper.tri(J_mat)] <- 0
+   survivedIntervals <- q_r^colCumsums(J_mat)
+   survivedIntervals[upper.tri(survivedIntervals)] <- 0
+   M <- matrix(1:(n-1), n-1,n-1)
+   M<-M-col(M)
+   M[M<0]<-0
+   notFoundIntervals <- q_d^M
+   survivedAndFound <-pd*(colSums(survivedIntervals*notFoundIntervals))
+  gfe <- sum((1-q_r^J[-n])*(survivedAndFound*q_d+pd)/(1-s))*s + s*f*(1-s^J[n])/(1-s)
+   gfe / sum(J)
+ }
> 
> probsSurvived <- seq(0.3,0.9,length.out=1000)
> probsFound <- seq(0.2,0.7,le=1000)
> inspectionPeriods <- sample(1:1000, 30, rep =T)
> system.time(old <- apply(cbind(probsSurvived,probsFound), 1, function(x) ettersonEq14(x[1],x[2],inspectionPeriods)))
   user  system elapsed 
  70.85    3.81   74.77 
> system.time(new <- apply(cbind(probsSurvived,probsFound), 1, function(x) ettersonEq14_new(x[1],x[2],inspectionPeriods)))
   user  system elapsed 
   0.12    0.01    0.16 
> identical(old,new)
[1] FALSE
> all.equal(old,new)
[1] TRUE

I also run a test with 10000 rows and 300 inspection periods with my function it took less than three minutes:

> probsSurvived <- seq(0.3,0.9,length.out=10000)
> probsFound <- seq(0.2,0.7,le=10000)
> inspectionPeriods <- sample(1:1000, 300, rep =T)
> system.time(new <- apply(cbind(probsSurvived,probsFound), 1, function(x) ettersonEq14_new(x[1],x[2],inspectionPeriods)))
   user  system elapsed 
 155.07   10.76  168.08 
Sign up to request clarification or add additional context in comments.

Comments

1

The version below runs and produces results that are not all NA (I didn't fix the error messages not being printed).

carcass was not available to each cluster so using :: in the function fixes this. Also passing a data.frame to parLapply meant the function was being applied to each column (not row) so converting to list fixes this.

library(carcass)
library(doParallel)
library(parallel)

no_cores <- detectCores(logical = TRUE)
no_cores

cl <- makeCluster(no_cores-1)  
registerDoParallel(cl)  

test_data<-data.frame(No.=c(39,48,16,23,7,1),p=c(0.05,0.05,0.05,0.05,0.05,0.05),c=rep(0.708,6),ID=c(1,1,2,2,2,2))
I<-list(c(rep(c(7,10),8),7),c(rep(7,10)))

all_samples<- as.data.frame((test_data))
all_IDs<-I
seq_id_all<- seq_along(1:nrow(all_samples))

#Write function: high maxN
function_test <- function(row, I, maxN = 600000) {
  tryCatch({
    # Calculate main posterior estimate
    # note use of carcass:: so each cluster can access functions
    # alternatively could cluster export carcass
    p_main <- carcass::ettersonEq14(s = row[[3]], f = row[[2]], J = I[[row[[4]]]])
    N_main <- carcass::posteriorN(p = p_main, nf = row[[1]], maxN = maxN, plot = FALSE)
    
    return(N_main$HT.estimate)
    
  }, error = function(e) {
    message("Error processing row: ", paste(row, collapse = ", "), " - ", e$message)
    return(c(N_main = NA))
  })
}

# convert df to list to pass to parLapply
all_samples_list <- split(all_samples, seq(nrow(all_samples)))

clusterExport(cl,list('function_test','all_samples_list','I'))

system.time(results <- parLapply(cl, all_samples_list, fun=function_test, I=I))
stopCluster(cl)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.