Commit 0a25fe8b authored by Ilya Korsunsky's avatar Ilya Korsunsky
Browse files

removed rowwise operations, they are very slow

parent a6bffff2
Loading
Loading
Loading
Loading
+36 −19
Original line number Diff line number Diff line
@@ -47,6 +47,9 @@ glmm_uni <- function(feature, formula, design, response, family, nsim, has_offse
            prior_sd <- c(prior_sd, sd(residuals(model, 'pearson')))
        }
        
#         rm(model)
#         gc(full = TRUE, verbose = FALSE, reset = TRUE)
        
        return(list(
            status = 0, beta = beta, sigma = sigma, epsilon = epsilon, 
            epsilon_pearson = epsilon_pearson, prior_sd = prior_sd
@@ -97,6 +100,11 @@ presto.presto <- function(
        features <- rownames(response)
    }

    ## TODO: make a more rigorous check for this
    if (family %in% c('poisson', 'binomial', 'nb')) {
        message('CAUTION: if using GLMM, make sure your counts are integers!')
    }
    
    ## To make downstream things easier, give exposure variable a dedicated name 
    ## TODO: check that SIZE is valid exposure type variable 
    design$EXPOSURE <- design[[size_varname]]
@@ -126,6 +134,8 @@ presto.presto <- function(
        )
    }

    message('SETUP FUTURES')
    
    ## set up parallel machinery 
    features <- intersect(features, rownames(response))
    if (ncore == 1) {
@@ -139,27 +149,34 @@ presto.presto <- function(
        future::plan(future::multiprocess(workers = .ncore))
        rm(.ncore)
    }
    message('RUN THOSE FUTURES')
    
    lres <- furrr::future_map(features, glmm_uni, formula, design, response, family, nsim, has_offset)
    names(lres) <- features
    lres <- lres[!is.na(lres)]

#     ## chunk into `ncore` futures, one for each core
#     chunk_assign <- sample(rep(seq(ncore), length.out = length(features)))
#     futures_list <- split(features, chunk_assign) %>% map(function(features_chunk) {
#         future(
#             expr = {
#                 lres <- map(
#                     features_chunk, ## input
#                     glmm_uni, ## function
#                     formula, design, response[features_chunk, , drop = FALSE], family, nsim, has_offset ## params
#                 )
#                 names(lres) <- features_chunk
#                 return(lres)
#             }, 
#             lazy = FALSE,
#             globals = FALSE
#         )
#     })    
#     message('COLLECT THOSE FUTURES')
#     lres <- Reduce(append, value(futures_list))
#     lres <- lres[which(map(lres, 'status') == 0)]    
    
    ## chunk into `ncore` futures, one for each core
    chunk_assign <- sample(rep(seq(ncore), length.out = length(features)))
    futures_list <- split(features, chunk_assign) %>% map(function(features_chunk) {
        future(
            expr = {
                lres <- map(
                    features_chunk, ## input
                    glmm_uni, ## function
                    formula, design, response[features_chunk, , drop = FALSE], family, nsim, has_offset ## params
                )
                names(lres) <- features_chunk
                return(lres)
            }, 
            lazy = FALSE,
            globals = FALSE
        )
    })
    lres <- Reduce(append, value(futures_list))
    lres <- lres[which(map(lres, 'status') == 0)]    
    
    message('AGGREGATE MY OWN RESULTS')
    # Aggregate results 
    common_el <- purrr::reduce(map(lres, names), intersect) %>% setdiff('status')
    res <- map(common_el, function(name) {
+10 −8
Original line number Diff line number Diff line
@@ -40,20 +40,22 @@
.merge_betas <- function(X1, X2) {
    terms_join <- setdiff(intersect(colnames(X1), colnames(X2)), 'beta')
    suppressMessages({
        dplyr::full_join(X1, X2, by = terms_join) %>% 
            dplyr::rowwise() %>% 
            dplyr::mutate(beta = sum(beta.x, beta.y, na.rm = TRUE)) %>% 
            dplyr::select(-beta.x, -beta.y)    
        res <- dplyr::full_join(X1, X2, by = terms_join) 
        res$beta <- replace_na(res$beta.x, 0) + replace_na(res$beta.y, 0)
        res$beta.x <- NULL
        res$beta.y <- NULL        
    })
    return(res)
}


.merge_sigmas <- function(X1, X2) {
    terms_join <- setdiff(intersect(colnames(X1), colnames(X2)), 'sigma')
    suppressMessages({
    dplyr::full_join(X1, X2, by = terms_join) %>% 
        dplyr::rowwise() %>% 
        dplyr::mutate(sigma = sqrt(sum(sigma.x ^ 2, sigma.y ^ 2, na.rm = TRUE))) %>% 
        dplyr::select(-sigma.x, -sigma.y)
        res <- dplyr::full_join(X1, X2, by = terms_join) 
        res$sigma <- sqrt(replace_na(res$sigma.y, 0) ^ 2 + replace_na(res$sigma.y, 0) ^ 2)
        res$sigma.x <- NULL
        res$sigma.y <- NULL        
    })
    return(res)
}