Commit 1b853903 authored by mourisl's avatar mourisl Committed by Li Song
Browse files

Add parallelized implementation for sortoutmappings to improve the time efficiency a little bit

parent eebb293b
Loading
Loading
Loading
Loading
+15 −11
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@
#include "temp_mapping.h"
#include "utils.h"

#define CHROMAP_VERSION "0.3.1-r513"
#define CHROMAP_VERSION "0.3.1-r514"

namespace chromap {

@@ -528,8 +528,8 @@ void Chromap::MapSingleEndReads() {
                    mappings_on_diff_ref_seqs);
            if (mapping_parameters_.low_memory_mode &&
                num_mappings_in_mem > max_num_mappings_in_mem) {
              mapping_processor.SortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs);
              mapping_processor.ParallelSortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs, 0);

              mapping_writer.OutputTempMappings(num_reference_sequences,
                                                mappings_on_diff_ref_seqs,
@@ -600,13 +600,15 @@ void Chromap::MapSingleEndReads() {

    if (mapping_parameters_.remove_pcr_duplicates) {
      mapping_processor.RemovePCRDuplicate(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
                                           mappings_on_diff_ref_seqs,
                                           mapping_parameters_.num_threads);
      std::cerr << "After removing PCR duplications, ";
      mapping_processor.OutputMappingStatistics(num_reference_sequences,
                                                mappings_on_diff_ref_seqs);
    } else {
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
      mapping_processor.ParallelSortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs,
                                           mapping_parameters_.num_threads);
    }

    if (mapping_parameters_.allocate_multi_mappings) {
@@ -1247,8 +1249,8 @@ void Chromap::MapPairedEndReads() {
                    mappings_on_diff_ref_seqs);
            if (mapping_parameters_.low_memory_mode &&
                num_mappings_in_mem > max_num_mappings_in_mem) {
              mapping_processor.SortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs);
              mapping_processor.ParallelSortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs, 0);

              mapping_writer.OutputTempMappings(num_reference_sequences,
                                                mappings_on_diff_ref_seqs,
@@ -1320,13 +1322,15 @@ void Chromap::MapPairedEndReads() {

    if (mapping_parameters_.remove_pcr_duplicates) {
      mapping_processor.RemovePCRDuplicate(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
                                           mappings_on_diff_ref_seqs,
                                           mapping_parameters_.num_threads);
      std::cerr << "After removing PCR duplications, ";
      mapping_processor.OutputMappingStatistics(num_reference_sequences,
                                                mappings_on_diff_ref_seqs);
    } else {
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
      mapping_processor.ParallelSortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs,
                                           mapping_parameters_.num_threads);
    }

    if (mapping_parameters_.allocate_multi_mappings) {
+58 −4
Original line number Diff line number Diff line
@@ -48,9 +48,15 @@ class MappingProcessor {
      uint32_t num_reference_sequences,
      std::vector<std::vector<MappingRecord>> &mappings) const;
  
  void ParallelSortOutputMappings(
      uint32_t num_reference_sequences,
      std::vector<std::vector<MappingRecord>> &mappings,
      int num_threads) const;

  void RemovePCRDuplicate(
      uint32_t num_reference_sequences,
      std::vector<std::vector<MappingRecord>> &mappings) const;
      std::vector<std::vector<MappingRecord>> &mappings,
      int num_threads) const;

  void AllocateMultiMappings(
      uint32_t num_reference_sequences, uint64_t num_multi_mappings,
@@ -105,12 +111,60 @@ void MappingProcessor<MappingRecord>::SortOutputMappings(
  // Chromap<>::GetRealTime() - real_dedupe_start_time << "s.\n";
}

// If num_thread <= 0, then the number of thread is set externally
// Seems in this case omp task is much more efficient than omp parallel
template <typename MappingRecord>
void MappingProcessor<MappingRecord>::ParallelSortOutputMappings(
    uint32_t num_reference_sequences,
    std::vector<std::vector<MappingRecord>> &mappings,
    int num_threads) const {
  // double real_dedupe_start_time = Chromap<>::GetRealTime();
  if (num_threads <= 0)
  {
#pragma omp task shared(mappings)
    {
      for (uint32_t ri = 0; ri < num_reference_sequences; ri += 2) {
        std::sort(mappings[ri].begin(), mappings[ri].end());
      }
    }
    
#pragma omp task shared(mappings)
    {
      for (uint32_t ri = 1; ri < num_reference_sequences; ri += 2) {
        std::sort(mappings[ri].begin(), mappings[ri].end());
      }
    }
#pragma omp taskwait
  }
  else
  {
#pragma omp parallel shared(mappings) num_threads(num_threads)
    {
#pragma omp single
      {
#pragma omp taskloop
        for (uint32_t ri = 0; ri < num_reference_sequences; ++ri) {
          std::sort(mappings[ri].begin(), mappings[ri].end());
        }
      }
    }
  }
  
  //uint32_t num_mappings = 0;
  //for (uint32_t ri = 0; ri < num_reference_sequences; ++ri) {
  //  num_mappings += mappings[ri].size();
  //}
  // std::cerr << "Sorted " << num_mappings << " elements in " <<
  // Chromap<>::GetRealTime() - real_dedupe_start_time << "s.\n";
}

template <typename MappingRecord>
void MappingProcessor<MappingRecord>::RemovePCRDuplicate(
    uint32_t num_reference_sequences,
    std::vector<std::vector<MappingRecord>> &mappings) const {
    std::vector<std::vector<MappingRecord>> &mappings,
    int num_threads) const {
  double real_dedupe_start_time = GetRealTime();
  SortOutputMappings(num_reference_sequences, mappings);
  ParallelSortOutputMappings(num_reference_sequences, mappings, num_threads);
  std::cerr << "Sorted in " << GetRealTime() - real_dedupe_start_time << "s.\n";

  std::vector<std::vector<MappingRecord>> deduped_mappings;
@@ -143,7 +197,7 @@ void MappingProcessor<MappingRecord>::RemovePCRDuplicate(
      deduped_mappings[ri].swap(mappings[ri]);
    }
  }
  std::cerr << num_mappings << " mappings left after dedupe in "
  std::cerr << num_mappings << " mappings left after deduplication in "
            << GetRealTime() - real_dedupe_start_time << "s.\n";
}