Commit eebb293b authored by mourisl's avatar mourisl Committed by Li Song
Browse files

Significantly improve the scalability by parsing input fastq files in parallel

parent 80b40f21
Loading
Loading
Loading
Loading
+113 −46
Original line number Diff line number Diff line
@@ -29,9 +29,12 @@ void Chromap::ConstructIndex() {
}

uint32_t Chromap::LoadSingleEndReadsWithBarcodes(SequenceBatch &read_batch,
                                                 SequenceBatch &barcode_batch) {
  double real_start_time = GetRealTime();
                                                 SequenceBatch &barcode_batch,
                                                 bool parallel_parsing) {
  //double real_start_time = GetRealTime();
  uint32_t num_loaded_reads = 0;

  if (!parallel_parsing || mapping_parameters_.is_bulk_data) {
    while (num_loaded_reads < read_batch_size_) {
      bool no_more_read = read_batch.LoadOneSequenceAndSaveAt(num_loaded_reads);
      bool no_more_barcode = no_more_read;
@@ -39,37 +42,62 @@ uint32_t Chromap::LoadSingleEndReadsWithBarcodes(SequenceBatch &read_batch,
        no_more_barcode =
          barcode_batch.LoadOneSequenceAndSaveAt(num_loaded_reads);
      }
    if ((!no_more_read) && (!no_more_barcode)) {
      if (read_batch.GetSequenceLengthAt(num_loaded_reads) <
          (uint32_t)mapping_parameters_.min_read_length) {
        continue;  // reads are too short, just drop.
      }
      // if (PairedEndReadWithBarcodeIsDuplicate(num_loaded_pairs,
      // (*barcode_batch), (*read_batch1), (*read_batch2))) {
      //  num_duplicated_reads_ += 2;
      //  continue;
      //}
    } else if (no_more_read && no_more_barcode) {

      if (no_more_read && no_more_barcode) {
        break;
    } else {
      } else if (no_more_read || no_more_barcode){
        ExitWithMessage("Numbers of reads and barcodes don't match!");
      }
      ++num_loaded_reads;
    }
  if (num_loaded_reads > 0) {
  } else {
    uint32_t num_loaded_barcode = 0 ;
    
#pragma omp task shared(num_loaded_reads, read_batch)
    {
      uint32_t i = 0 ;
      for (i = 0 ; i < read_batch_size_; ++i) {
        if (read_batch.LoadOneSequenceAndSaveAt(i) == true) { // true: no more read
          break ;
        }
      }
      num_loaded_reads = i ;
    }

#pragma omp task shared(num_loaded_barcode, barcode_batch)
    { // bulk data will go to the other big branch
      uint32_t i = 0 ;
      for (i = 0 ; i < read_batch_size_; ++i) {
        if (barcode_batch.LoadOneSequenceAndSaveAt(i) == true) { // true: no more read
          break ;
        }
      }
      num_loaded_barcode = i ;
    }

#pragma omp taskwait

    if (num_loaded_reads != num_loaded_barcode) {
        ExitWithMessage("Numbers of reads and barcodes don't match!");
    }
  }
  /*if (num_loaded_reads > 0) {
    std::cerr << "Loaded " << num_loaded_reads << " reads in "
              << GetRealTime() - real_start_time << "s.\n";
  } else {
    std::cerr << "No more reads.\n";
  }
  }*/
  return num_loaded_reads;
}

uint32_t Chromap::LoadPairedEndReadsWithBarcodes(SequenceBatch &read_batch1,
                                                 SequenceBatch &read_batch2,
                                                 SequenceBatch &barcode_batch) {
                                                 SequenceBatch &barcode_batch,
                                                 bool parallel_parsing) {
  // double real_start_time = Chromap<>::GetRealTime();
  uint32_t num_loaded_pairs = 0;
  
  if (!parallel_parsing) {
    while (num_loaded_pairs < read_batch_size_) {
      bool no_more_read1 = read_batch1.LoadOneSequenceAndSaveAt(num_loaded_pairs);
      bool no_more_read2 = read_batch2.LoadOneSequenceAndSaveAt(num_loaded_pairs);
@@ -78,25 +106,64 @@ uint32_t Chromap::LoadPairedEndReadsWithBarcodes(SequenceBatch &read_batch1,
        no_more_barcode =
          barcode_batch.LoadOneSequenceAndSaveAt(num_loaded_pairs);
      }
    if ((!no_more_read1) && (!no_more_read2) && (!no_more_barcode)) {
      if (read_batch1.GetSequenceLengthAt(num_loaded_pairs) <
              (uint32_t)mapping_parameters_.min_read_length ||
          read_batch2.GetSequenceLengthAt(num_loaded_pairs) <
              (uint32_t)mapping_parameters_.min_read_length) {
        continue;  // reads are too short, just drop.
      }
      // if (PairedEndReadWithBarcodeIsDuplicate(num_loaded_pairs,
      // (*barcode_batch), (*read_batch1), (*read_batch2))) {
      //  num_duplicated_reads_ += 2;
      //  continue;
      //}
    } else if (no_more_read1 && no_more_read2 && no_more_barcode) {
      
      if (no_more_read1 && no_more_read2 && no_more_barcode) {
        break;
    } else {
      } else if (no_more_read1 || no_more_read2 || no_more_barcode){
        ExitWithMessage("Numbers of reads and barcodes don't match!");
      }
      ++num_loaded_pairs;
    }
  } else {
    uint32_t num_loaded_read1 = 0;
    uint32_t num_loaded_read2 = 0;
    uint32_t num_loaded_barcode = 0;
    
#pragma omp task shared(num_loaded_read1, read_batch1)
    {
      uint32_t i = 0 ;
      for (i = 0 ; i < read_batch_size_; ++i) {
        if (read_batch1.LoadOneSequenceAndSaveAt(i) == true) { // true: no more read
          break ;
        }
      }
      num_loaded_read1 = i ;
    }

#pragma omp task shared(num_loaded_read2, read_batch2)
    {
      uint32_t i = 0 ;
      for (i = 0 ; i < read_batch_size_; ++i) {
        if (read_batch2.LoadOneSequenceAndSaveAt(i) == true) { // true: no more read
          break ;
        }
      }
      num_loaded_read2 = i ;
    }

#pragma omp task shared(num_loaded_barcode, barcode_batch)
    {
      if (!mapping_parameters_.is_bulk_data) {
        uint32_t i = 0 ;
        for (i = 0 ; i < read_batch_size_; ++i) {
          if (barcode_batch.LoadOneSequenceAndSaveAt(i) == true) { // true: no more read
            break ;
          }
        }
        num_loaded_barcode = i ;
      }
    }

#pragma omp taskwait
    if (mapping_parameters_.is_bulk_data) {
      num_loaded_barcode = num_loaded_read2;
    }
    if (num_loaded_read1 != num_loaded_read2 || num_loaded_read2 != num_loaded_barcode) {
        ExitWithMessage("Numbers of reads and barcodes don't match!");
    }

    num_loaded_pairs = num_loaded_read1 ;
  }
  // if (num_loaded_pairs > 0) {
  //  std::cerr << "Loaded " << num_loaded_pairs << " pairs in "<<
  //  Chromap<>::GetRealTime() - real_start_time << "s. ";
+27 −9
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@
#include "temp_mapping.h"
#include "utils.h"

#define CHROMAP_VERSION "0.3.1-r512"
#define CHROMAP_VERSION "0.3.1-r513"

namespace chromap {

@@ -123,11 +123,13 @@ class Chromap {

 private:
  uint32_t LoadSingleEndReadsWithBarcodes(SequenceBatch &read_batch,
                                          SequenceBatch &barcode_batch);
                                          SequenceBatch &barcode_batch,
                                          bool parallel_parsing);

  uint32_t LoadPairedEndReadsWithBarcodes(SequenceBatch &read_batch1,
                                          SequenceBatch &read_batch2,
                                          SequenceBatch &barcode_batch);
                                          SequenceBatch &barcode_batch,
                                          bool parallel_parsing);

  void TrimAdapterForPairedEndRead(uint32_t pair_index,
                                   SequenceBatch &read_batch1,
@@ -321,7 +323,8 @@ void Chromap::MapSingleEndReads() {

    uint32_t num_loaded_reads_for_loading = 0;
    uint32_t num_loaded_reads = LoadSingleEndReadsWithBarcodes(
        read_batch_for_loading, barcode_batch_for_loading);
        read_batch_for_loading, barcode_batch_for_loading,
        mapping_parameters_.num_threads >= 3 ? true : false);
    read_batch_for_loading.SwapSequenceBatch(read_batch);

    if (!mapping_parameters_.is_bulk_data) {
@@ -369,7 +372,8 @@ void Chromap::MapSingleEndReads() {
#pragma omp task
          {
            num_loaded_reads_for_loading = LoadSingleEndReadsWithBarcodes(
                read_batch_for_loading, barcode_batch_for_loading);
                read_batch_for_loading, barcode_batch_for_loading,
                mapping_parameters_.num_threads >= 12 ? true : false);
          }  // end of openmp loading task
          uint32_t history_update_threshold =
          mm_to_candidates_cache.GetUpdateThreshold(num_loaded_reads,
@@ -396,6 +400,11 @@ void Chromap::MapSingleEndReads() {
              continue;
            }
            
            if (read_batch.GetSequenceLengthAt(read_index) <
                (uint32_t)mapping_parameters_.min_read_length) {
              continue;  // reads are too short, just drop.
            }

            read_batch.PrepareNegativeSequenceAt(read_index);

            mapping_metadata.PrepareForMappingNextRead(
@@ -534,8 +543,8 @@ void Chromap::MapSingleEndReads() {
              num_mappings_in_mem = 0;
            }
          }
          std::cerr << "Mapped in " << GetRealTime() - real_batch_start_time
                    << "s.\n";
          std::cerr << "Mapped " << num_loaded_reads << " reads in "
                    << GetRealTime() - real_batch_start_time << "s.\n";
        }
      }  // end of openmp single
      {
@@ -801,7 +810,7 @@ void Chromap::MapPairedEndReads() {
    uint32_t num_loaded_pairs_for_loading = 0;
    uint32_t num_loaded_pairs = LoadPairedEndReadsWithBarcodes(
        read_batch1_for_loading, read_batch2_for_loading,
        barcode_batch_for_loading);
        barcode_batch_for_loading, mapping_parameters_.num_threads >= 3 ? true : false);
    read_batch1_for_loading.SwapSequenceBatch(read_batch1);
    read_batch2_for_loading.SwapSequenceBatch(read_batch2);
    if (!mapping_parameters_.is_bulk_data) {
@@ -858,7 +867,8 @@ void Chromap::MapPairedEndReads() {
          {
            num_loaded_pairs_for_loading = LoadPairedEndReadsWithBarcodes(
                read_batch1_for_loading, read_batch2_for_loading,
                barcode_batch_for_loading);
                barcode_batch_for_loading, 
                mapping_parameters_.num_threads >= 12 ? true : false);
          }  // end of openmp loading task

          int grain_size = 5000;
@@ -892,6 +902,14 @@ void Chromap::MapPairedEndReads() {

            if (current_barcode_is_whitelisted ||
                mapping_parameters_.output_mappings_not_in_whitelist) {
              
              if (read_batch1.GetSequenceLengthAt(pair_index) <
                  (uint32_t)mapping_parameters_.min_read_length ||
                  read_batch2.GetSequenceLengthAt(pair_index) <
                  (uint32_t)mapping_parameters_.min_read_length) {
                continue;  // reads are too short, just drop.
              }

              read_batch1.PrepareNegativeSequenceAt(pair_index);
              read_batch2.PrepareNegativeSequenceAt(pair_index);