Commit cf9ac497 authored by Haowen Zhang's avatar Haowen Zhang
Browse files

implement multi-thread single-end read mapping

parent 480fa351
Loading
Loading
Loading
Loading
+121 −64
Original line number Diff line number Diff line
@@ -620,10 +620,35 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
  index.Load();
  //index.Statistics(num_sequences, reference);
  SequenceBatch read_batch(read_batch_size_);
  SequenceBatch read_batch_for_loading(read_batch_size_);
  SequenceBatch barcode_batch(read_batch_size_);
  read_batch.InitializeLoading(read_file1_path_);
  SequenceBatch barcode_batch_for_loading(read_batch_size_);
  read_batch_for_loading.InitializeLoading(read_file1_path_);
  if (!is_bulk_data_) {
    barcode_batch.InitializeLoading(barcode_file_path_);
    barcode_batch_for_loading.InitializeLoading(barcode_file_path_);
  }
  double real_start_mapping_time = Chromap<>::GetRealTime();
  uint32_t num_loaded_reads_for_loading = 0;
  uint32_t num_loaded_reads = LoadSingleEndReadsWithBarcodes(&read_batch_for_loading, &barcode_batch_for_loading);
  read_batch_for_loading.SwapSequenceBatch(read_batch);
  barcode_batch_for_loading.SwapSequenceBatch(barcode_batch);
  mappings_on_diff_ref_seqs_.reserve(num_reference_sequences);
  deduped_mappings_on_diff_ref_seqs_.reserve(num_reference_sequences);
  for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    mappings_on_diff_ref_seqs_.emplace_back(std::vector<MappingRecord>());
    deduped_mappings_on_diff_ref_seqs_.emplace_back(std::vector<MappingRecord>());
  }
  std::vector<std::vector<std::vector<MappingRecord> > > mappings_on_diff_ref_seqs_for_diff_threads;
  std::vector<std::vector<std::vector<MappingRecord> > > mappings_on_diff_ref_seqs_for_diff_threads_for_saving;
  mappings_on_diff_ref_seqs_for_diff_threads.reserve(num_threads_);
  mappings_on_diff_ref_seqs_for_diff_threads_for_saving.reserve(num_threads_);
  for (int ti = 0; ti < num_threads_; ++ti) {
    mappings_on_diff_ref_seqs_for_diff_threads.emplace_back(std::vector<std::vector<MappingRecord> >(num_reference_sequences));
    mappings_on_diff_ref_seqs_for_diff_threads_for_saving.emplace_back(std::vector<std::vector<MappingRecord> >(num_reference_sequences));
    for (uint32_t i = 0; i < num_reference_sequences; ++i) {
      mappings_on_diff_ref_seqs_for_diff_threads[ti][i].reserve((num_loaded_reads + num_loaded_reads / 1000 * max_num_best_mappings_) / num_threads_ / num_reference_sequences);
      mappings_on_diff_ref_seqs_for_diff_threads_for_saving[ti][i].reserve((num_loaded_reads + num_loaded_reads / 1000 * max_num_best_mappings_) / num_threads_ / num_reference_sequences);
    }
  }
  if (output_mapping_in_BED_) {
    output_tools = std::unique_ptr<BEDOutputTools<MappingRecord> >(new BEDOutputTools<MappingRecord>);
@@ -633,13 +658,13 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
    output_tools = std::unique_ptr<PAFOutputTools<MappingRecord> >(new PAFOutputTools<MappingRecord>);
  }
  output_tools->InitializeMappingOutput(mapping_output_file_path_);
  double real_start_mapping_time = Chromap<>::GetRealTime();
  // TODO(Haowen): start openmp parallel region and declare private variables
  //{
  uint64_t thread_num_candidates = 0;
  uint64_t thread_num_mappings = 0;
  uint64_t thread_num_mapped_reads = 0; 
  uint64_t thread_num_uniquely_mapped_reads = 0;
  static uint64_t thread_num_candidates = 0;
  static uint64_t thread_num_mappings = 0;
  static uint64_t thread_num_mapped_reads = 0; 
  static uint64_t thread_num_uniquely_mapped_reads = 0; 
#pragma omp threadprivate(thread_num_candidates, thread_num_mappings, thread_num_mapped_reads, thread_num_uniquely_mapped_reads)
#pragma omp parallel default(none) shared(reference, index, read_batch, barcode_batch, read_batch_for_loading, barcode_batch_for_loading, std::cerr, num_loaded_reads_for_loading, num_loaded_reads, num_reference_sequences, mappings_on_diff_ref_seqs_for_diff_threads, mappings_on_diff_ref_seqs_for_diff_threads_for_saving) num_threads(num_threads_) reduction(+:num_candidates_, num_mappings_, num_mapped_reads_, num_uniquely_mapped_reads_)
  {
  std::vector<std::pair<uint64_t, uint64_t> > minimizers;
  std::vector<uint64_t> positive_hits;
  std::vector<uint64_t> negative_hits;
@@ -653,29 +678,24 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
  std::vector<std::pair<int, uint64_t> > negative_mappings;
  positive_mappings.reserve(max_seed_frequencies_[0]);
  negative_mappings.reserve(max_seed_frequencies_[0]);
  mappings_on_diff_ref_seqs_.reserve(num_reference_sequences);
  deduped_mappings_on_diff_ref_seqs_.reserve(num_reference_sequences);
  for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    mappings_on_diff_ref_seqs_.emplace_back(std::vector<MappingRecord>());
    deduped_mappings_on_diff_ref_seqs_.emplace_back(std::vector<MappingRecord>());
  }
  //std::vector<std::vector<MappingRecord> > mappings_on_diff_ref_seqs(num_reference_sequences);
  //for (uint32_t i = 0; i < num_reference_sequences; ++i) {
  //  mappings_on_diff_ref_seqs[i].reserve((read_batch_size_ + read_batch_size_ / 100 * max_num_best_mappings_) / num_threads_ / num_reference_sequences);
  //}
  uint32_t num_reads_in_current_batch = read_batch.LoadBatch();
  while (num_reads_in_current_batch > 0) {
    num_reads_ += num_reads_in_current_batch;
    for (uint32_t read_index = 0; read_index < num_reads_in_current_batch; ++read_index) {
  #pragma omp single
  {
  while (num_loaded_reads > 0) {
    double real_batch_start_time = Chromap<>::GetRealTime();
    num_reads_ += num_loaded_reads;
#pragma omp task
    {
    num_loaded_reads_for_loading = LoadSingleEndReadsWithBarcodes(&read_batch_for_loading, &barcode_batch_for_loading);
    } // end of openmp loading task
    int grain_size = 10000;
#pragma omp taskloop grainsize(grain_size) //num_tasks(num_threads_* 50)
    for (uint32_t read_index = 0; read_index < num_loaded_reads; ++read_index) {
      read_batch.PrepareNegativeSequenceAt(read_index);
      //std::cerr << "Generated negative sequence!\n";
      minimizers.clear();
      minimizers.reserve(read_batch.GetSequenceLengthAt(read_index) / window_size_ * 2);
      index.GenerateMinimizerSketch(read_batch, read_index, &minimizers);
      if (minimizers.size() == 0) {
        std::cerr << "Read " << read_index << " has no minimizer!\n";
        continue;
      }
      if (minimizers.size() > 0) {
        //std::cerr << "Generated minimizers!\n";
        positive_hits.clear();
        negative_hits.clear();
@@ -694,7 +714,7 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
          uint32_t current_num_mappings = positive_mappings.size() + negative_mappings.size();
          //std::cerr << "Verified candidates!\n";
          if (current_num_mappings > 0) {
          std::vector<std::vector<MappingRecord> > &mappings_on_diff_ref_seqs = mappings_on_diff_ref_seqs_;//mappings_on_diff_ref_seqs_for_diff_threads[omp_get_thread_num()];
            std::vector<std::vector<MappingRecord> > &mappings_on_diff_ref_seqs = mappings_on_diff_ref_seqs_for_diff_threads[omp_get_thread_num()];
            GenerateBestMappingsForSingleEndRead(min_num_errors, num_best_mappings, second_min_num_errors, num_second_best_mappings, read_batch, read_index, reference, barcode_batch, positive_mappings, negative_mappings, &mappings_on_diff_ref_seqs);
            thread_num_mappings += std::min(num_best_mappings, max_num_best_mappings_);
            //std::cerr << "Generated output!\n";
@@ -704,26 +724,30 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
            }
          }
        }
    //  if (current_num_mappings == 0) {
    //    std::cerr << "Read " << read_index << " candidates: " << current_num_candidates << " , mappings: " << current_num_mappings << "\n";
    //  }
      }
    num_reads_in_current_batch = read_batch.LoadBatch();
    //for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    //  mappings_on_diff_ref_seqs[i].reserve(mappings_on_diff_ref_seqs[i].size() + (read_batch_size_ + read_batch_size_ / 100 * max_num_best_mappings_) / num_threads_ / num_reference_sequences);
    //}
    }
  // TODO(Haowen): update shared mapping stats using critical region or atom operation
#pragma omp taskwait
    num_loaded_reads = num_loaded_reads_for_loading;
    read_batch_for_loading.SwapSequenceBatch(read_batch);
    barcode_batch_for_loading.SwapSequenceBatch(barcode_batch);
    mappings_on_diff_ref_seqs_for_diff_threads.swap(mappings_on_diff_ref_seqs_for_diff_threads_for_saving);
#pragma omp task
    {
    MoveMappingsInBuffersToMappingContainer(num_reference_sequences, &mappings_on_diff_ref_seqs_for_diff_threads_for_saving);
    }
    std::cerr << "Mapped in " << Chromap<>::GetRealTime() - real_batch_start_time << "s.\n";
  }
  } // end of openmp single
  {
    num_candidates_ += thread_num_candidates;
    num_mappings_ += thread_num_mappings;
    num_mapped_reads_ += thread_num_mapped_reads;
    num_uniquely_mapped_reads_ += thread_num_uniquely_mapped_reads;
  } // end of updating shared mapping stats
  //} // end of openmp parallel region
  read_batch.FinalizeLoading();
  } // end of openmp parallel region
  read_batch_for_loading.FinalizeLoading();
  if (!is_bulk_data_) {
    barcode_batch.FinalizeLoading();
    barcode_batch_for_loading.FinalizeLoading();
  }
  OutputMappingStatistics();
  std::cerr << "Mapped all reads in " << Chromap<>::GetRealTime() - real_start_mapping_time << "s.\n";
@@ -862,6 +886,39 @@ void Chromap<MappingRecord>::ApplyTn5ShiftOnSingleEndMapping(uint32_t num_refere
  std::cerr << "# shifted mappings: " << num_shifted_mappings << ".\n";
}

template <typename MappingRecord>
uint32_t Chromap<MappingRecord>::LoadSingleEndReadsWithBarcodes(SequenceBatch *read_batch, SequenceBatch *barcode_batch) {
  double real_start_time = Chromap<>::GetRealTime();
  uint32_t num_loaded_reads = 0;
  while (num_loaded_reads < read_batch_size_) {
    bool no_more_read = read_batch->LoadOneSequenceAndSaveAt(num_loaded_reads);
    bool no_more_barcode = no_more_read;
    if (!is_bulk_data_) {
      no_more_barcode = barcode_batch->LoadOneSequenceAndSaveAt(num_loaded_reads);
    }
    if ((!no_more_read) && (!no_more_barcode)) {
      if (read_batch->GetSequenceLengthAt(num_loaded_reads) < (uint32_t)min_read_length_) {
        continue; // reads are too short, just drop.
      }
      //if (PairedEndReadWithBarcodeIsDuplicate(num_loaded_pairs, (*barcode_batch), (*read_batch1), (*read_batch2))) {
      //  num_duplicated_reads_ += 2;
      //  continue;
      //}
    } else if (no_more_read && no_more_barcode) {
      break;
    } else {
      Chromap<>::ExitWithMessage("Numbers of reads and barcodes don't match!");
    }
    ++num_loaded_reads;
  }
  if (num_loaded_reads > 0) {
    std::cerr << "Loaded " << num_loaded_reads << " reads in "<< Chromap<>::GetRealTime() - real_start_time << "s.\n";
  } else {
    std::cerr << "No more reads.\n";
  }
  return num_loaded_reads;
}

template <typename MappingRecord>
void Chromap<MappingRecord>::ConstructIndex() {
  // TODO(Haowen): check args for index construction
@@ -1183,7 +1240,7 @@ void Chromap<MappingRecord>::OutputMappingStatistics(uint32_t num_reference_sequ
      }
    }
  } 
  std::cerr << "# uni-mappings: " << num_uni_mappings << ", # multi-mappings: " << num_multi_mappings << ".\n";
  std::cerr << "# uni-mappings: " << num_uni_mappings << ", # multi-mappings: " << num_multi_mappings << ", total: " << num_uni_mappings + num_multi_mappings << ".\n";
}

template <typename MappingRecord>
+1 −1
Original line number Diff line number Diff line
@@ -81,7 +81,7 @@ class Chromap {
  void EmplaceBackMappingRecord(uint32_t read_id, uint32_t barcode, uint32_t fragment_start_position, uint16_t fragment_length, uint8_t mapq, std::vector<MappingRecord> *mappings_on_diff_ref_seqs);
  void EmplaceBackMappingRecord(uint32_t read_id, const char* read_name, uint16_t read_length, uint32_t barcode, uint32_t fragment_start_position, uint16_t fragment_length, uint8_t mapq, std::vector<MappingRecord> *mappings_on_diff_ref_seqs);
  void ApplyTn5ShiftOnSingleEndMapping(uint32_t num_reference_sequences, std::vector<std::vector<MappingRecord> > *mappings);

  uint32_t LoadSingleEndReadsWithBarcodes(SequenceBatch *read_batch, SequenceBatch *barcode_batch);
  // Supportive functions
  void ConstructIndex();
  int BandedAlignPatternToText(const char *pattern, const char *text, const int read_length, int *mapping_end_location);