Commit 0947a2f2 authored by Ilya Korsunsky's avatar Ilya Korsunsky
Browse files

moved parallel functions from furrr future_map directly to futures

parent 69131e39
Loading
Loading
Loading
Loading
+52 −31
Original line number Diff line number Diff line
@@ -14,11 +14,8 @@
#' @export 
glmm_uni <- function(feature, formula, design, response, family, nsim, has_offset) {    
    tryCatch({ 
        ## In practice, refit introduces too much bias based on the initial fit 
#         model <- lme4::refit(model_base, response[feature, ])
        
        ## Fit the model from scratch instead 
        model <- fit_model.presto(formula, design, response, feature, family)
        ## Fit the model 
        model <- fit_model.presto(formula, design, response[feature, ], family)
        
        ## residuals
        epsilon <- as.numeric(residuals(model, 'response'))
@@ -50,29 +47,31 @@ glmm_uni <- function(feature, formula, design, response, family, nsim, has_offse
            prior_sd <- c(prior_sd, sd(residuals(model, 'pearson')))
        }
        
        return(list(beta = beta, sigma = sigma, epsilon = epsilon, epsilon_pearson = epsilon_pearson, prior_sd = prior_sd))        
        return(list(
            status = 0, beta = beta, sigma = sigma, epsilon = epsilon, 
            epsilon_pearson = epsilon_pearson, prior_sd = prior_sd
        ))
    }, error = function(e) {
        print(e)
        return(NA)
        return(list(status = 1, error = e))
    })
}

#' @export 
fit_model.presto <- function(formula, design, response, feature, family) {    
fit_model.presto <- function(formula, design, response, family) {    
    ## initialize model on one feature
    if (family == 'nb') {
        model <- lme4::glmer.nb(
            formula, cbind(design, y = response[feature, ]), 
            formula, cbind(design, y = response), 
            control = lme4::glmerControl(calc.derivs = FALSE, optimizer = "nloptwrap")
        )        
    } else if (family == 'poisson') {
        model <- lme4::glmer(
            formula, cbind(design, y = response[feature, ]), 'poisson', 
            formula, cbind(design, y = response), 'poisson', 
            control = lme4::glmerControl(calc.derivs = FALSE, optimizer = "nloptwrap")
        )
    } else if (family == 'gaussian') {
        model <- lme4::lmer(
            formula, cbind(design, y = response[feature, ]),
            formula, cbind(design, y = response),
        control = lmerControl(optimizer = "nloptwrap", calc.derivs = FALSE)
        )        
    } else {
@@ -90,16 +89,10 @@ presto.presto <- function(
    response, 
    size_varname,
    features=NULL, 
    parallel=FALSE, 
    ncore=1, 
    nsim=100, 
    family='poisson'
) {
    if (parallel) {
        future::plan(multicore)
        it_fxn <- furrr::future_map
    } else {
        it_fxn <- purrr::map
    }
    if (is.null(features)) {
        features <- rownames(response)
    }
@@ -111,9 +104,7 @@ presto.presto <- function(
    formula <- as.formula(sprintf('%s~%s', fstr[[2]], fstr[[3]]))

    ## fit an initial model just to get the names
    
    
    model_base <- fit_model.presto(formula, design, response, features[[1]], family)
    model_base <- fit_model.presto(formula, design, response[features[[1]], ], family)
    priornames_df <- as.data.frame(VarCorr(model_base))[, 1:3]
    if (isGLMM(model_base)) {
        ## glmer does not include residuals in VarCorr, lmer does
@@ -135,13 +126,39 @@ presto.presto <- function(
        )
    }
    
    ## then refit with new responses
    lres <- it_fxn(features, glmm_uni, formula, design, response, family, nsim, has_offset)
    names(lres) <- features
    lres <- lres[!is.na(lres)] 
    ## set up parallel machinery 
    features <- intersect(features, rownames(response))
    if (ncore == 1) {
        future::plan(sequential(globals = FALSE))        
    } else if (ncore %in% c(0, Inf)) {
        ncore <- availableCores()
        future::plan(multiprocess(globals = FALSE))
    } else {
        future::plan(multiprocess(workers = get('ncore'), globals = FALSE))
    }
    
    ## chunk into `ncore` futures, one for each core
    chunk_assign <- sample(rep(seq(ncore), length.out = length(features)))
    futures_list <- split(features, chunk_assign) %>% map(function(features_chunk) {
        future(
            expr = {
                lres <- map(
                    features_chunk, ## input
                    glmm_uni, ## function
                    formula, design, response[features_chunk, , drop = FALSE], family, nsim, has_offset ## params
                )
                names(lres) <- features_chunk
                return(lres)
            }, 
            lazy = FALSE,
            globals = FALSE
        )
    })
    lres <- Reduce(append, value(futures_list))
    lres <- lres[which(map(lres, 'status') == 0)]    
    
    # Aggregate results 
    common_el <- purrr::reduce(map(lres, names), intersect)
    common_el <- purrr::reduce(map(lres, names), intersect) %>% setdiff('status')
    res <- map(common_el, function(name) {
        as.matrix(map_dfr(lres, name))
    })
@@ -167,8 +184,12 @@ presto.presto <- function(
    res <- genemeans.presto(res, xpm=1e6)
    
    
    ## flags
    ## flags and stuff
    res$has_offset <- has_offset
    res$family <- family
    res$size_varname <- size_varname
    res$nsim <- nsim
    res$formula <- formula
    
    return(res)
}