Commit c4746b68 authored by Haowen Zhang's avatar Haowen Zhang
Browse files

Clean up code to process mappings in low mem.

parent a75753ec
Loading
Loading
Loading
Loading
+161 −170
Original line number Diff line number Diff line
@@ -488,6 +488,47 @@ void Chromap<MappingRecord>::UpdateBarcodeAbundance(
            << " in " << GetRealTime() - real_start_time << "s.\n";
}

template <typename MappingRecord>
size_t Chromap<MappingRecord>::FindBestMappingIndexFromDuplicates(
    const std::vector<MappingRecord> &duplicates) {
  // Find the best barcode, break ties first by the number of the
  // barcodes in the dups, then by the barcode abundance.
  size_t best_mapping_index = 0;

  khiter_t barcode_whitelist_lookup_table_iterator =
      kh_get(k64_seq, barcode_whitelist_lookup_table_,
             duplicates[best_mapping_index].GetBarcode());

  double best_mapping_barcode_abundance =
      kh_value(barcode_whitelist_lookup_table_,
               barcode_whitelist_lookup_table_iterator) /
      (double)num_sample_barcodes_;

  for (size_t bulk_dup_i = 1; bulk_dup_i < duplicates.size(); ++bulk_dup_i) {
    barcode_whitelist_lookup_table_iterator =
        kh_get(k64_seq, barcode_whitelist_lookup_table_,
               duplicates[bulk_dup_i].GetBarcode());

    const double current_mapping_barcode_abundance =
        kh_value(barcode_whitelist_lookup_table_,
                 barcode_whitelist_lookup_table_iterator) /
        (double)num_sample_barcodes_;

    const bool same_num_dups_with_higer_barcode_abundance =
        duplicates[bulk_dup_i].num_dups_ ==
            duplicates[best_mapping_index].num_dups_ &&
        current_mapping_barcode_abundance > best_mapping_barcode_abundance;

    if (duplicates[bulk_dup_i].num_dups_ >
            duplicates[best_mapping_index].num_dups_ ||
        same_num_dups_with_higer_barcode_abundance) {
      best_mapping_index = bulk_dup_i;
      best_mapping_barcode_abundance = current_mapping_barcode_abundance;
    }
  }
  return best_mapping_index;
}

template <typename MappingRecord>
void Chromap<MappingRecord>::PostProcessingInLowMemory(
    uint32_t num_mappings_in_mem, uint32_t num_reference_sequences,
@@ -495,23 +536,8 @@ void Chromap<MappingRecord>::PostProcessingInLowMemory(
    const MappingProcessor<MappingRecord> &mapping_processor) {
  // First, process the mappings in the memory and save them on disk.
  if (num_mappings_in_mem > 0) {
    TempMappingFileHandle<MappingRecord> temp_mapping_file_handle;
    temp_mapping_file_handle.file_path =
        mapping_output_file_path_ + ".temp" +
        std::to_string(temp_mapping_file_handles_.size());
    temp_mapping_file_handles_.emplace_back(temp_mapping_file_handle);
    mapping_processor.SortOutputMappings(num_reference_sequences,
                                         mappings_on_diff_ref_seqs_);
    // double output_temp_mapping_start_time = Chromap<>::GetRealTime();
    mapping_writer_.OutputTempMapping(temp_mapping_file_handle.file_path,
                                      num_reference_sequences,
                                      mappings_on_diff_ref_seqs_);
    // std::cerr << "Output temp mappings in " << Chromap<>::GetRealTime() -
    // output_temp_mapping_start_time << "s.\n";
    OutputTempMappings(num_reference_sequences, mapping_processor);
    num_mappings_in_mem = 0;
    for (uint32_t i = 0; i < num_reference_sequences; ++i) {
      mappings_on_diff_ref_seqs_[i].clear();
    }
  }

  if (temp_mapping_file_handles_.size() == 0) {
@@ -528,192 +554,147 @@ void Chromap<MappingRecord>::PostProcessingInLowMemory(
    max_mem_size = (uint64_t)1 << 30;
  }
  for (size_t hi = 0; hi < temp_mapping_file_handles_.size(); ++hi) {
    temp_mapping_file_handles_[hi].block_size =
        max_mem_size / temp_mapping_file_handles_.size() /
    const uint32_t temp_mapping_block_size = max_mem_size /
                                             temp_mapping_file_handles_.size() /
                                             sizeof(MappingRecord);

    temp_mapping_file_handles_[hi].InitializeTempMappingLoading(
        num_reference_sequences);
        temp_mapping_block_size);
    temp_mapping_file_handles_[hi].LoadTempMappingBlock(
        num_reference_sequences);
  }

  // Merge and dedupe.
  bool all_merged = false;
  uint32_t last_rid = std::numeric_limits<uint32_t>::max();
  MappingRecord last_mapping;
  uint32_t dup_count = 0;
  MappingRecord last_mapping = MappingRecord();
  uint32_t num_last_mapping_dups = 0;
  uint64_t num_uni_mappings = 0;
  uint64_t num_multi_mappings = 0;
  uint64_t num_mappings_passing_filters = 0;
  uint64_t num_total_mappings = 0;
  std::vector<MappingRecord> temp_dups_for_bulk_level_dedup;
  temp_dups_for_bulk_level_dedup.reserve(255);

  while (!all_merged) {
  const bool deduplicate_at_bulk_level_for_single_cell_data =
      remove_pcr_duplicates_ && !is_bulk_data_ &&
      remove_pcr_duplicates_at_bulk_level_;

  while (true) {
    // Merge, dedupe and output.
    // Find min first (sorted by rid and then barcode and then positions).
    size_t min_handle_index = temp_mapping_file_handles_.size();
    uint32_t min_rid = std::numeric_limits<uint32_t>::max();

    for (size_t hi = 0; hi < temp_mapping_file_handles_.size(); ++hi) {
      TempMappingFileHandle<MappingRecord> &current_handle =
      const TempMappingFileHandle<MappingRecord> &current_handle =
          temp_mapping_file_handles_[hi];
      if (!current_handle.all_loaded) {
        if (min_handle_index == temp_mapping_file_handles_.size() ||
            current_handle.current_rid < min_rid ||
            (current_handle.current_rid == min_rid &&
             current_handle.mappings[current_handle.current_mapping_index] <
      if (current_handle.HasMappings()) {
        const bool rid_is_smaller = current_handle.current_rid < min_rid;
        const bool same_rid_smaller_mapping =
            current_handle.current_rid == min_rid &&
            current_handle.GetCurrentMapping() <
                temp_mapping_file_handles_[min_handle_index]
                     .mappings[temp_mapping_file_handles_[min_handle_index]
                                   .current_mapping_index])) {
                    .GetCurrentMapping();

        if (rid_is_smaller || same_rid_smaller_mapping) {
          min_handle_index = hi;
          min_rid = current_handle.current_rid;
        }
      }
    }

    // Append current min to mappings if not a duplicate.
    if (!remove_pcr_duplicates_ ||
        min_handle_index != temp_mapping_file_handles_.size()) {
      MappingRecord &current_min_mapping =
          temp_mapping_file_handles_[min_handle_index]
              .mappings[temp_mapping_file_handles_[min_handle_index]
                            .current_mapping_index];
      if (remove_pcr_duplicates_ && last_rid == min_rid &&
          (current_min_mapping == last_mapping ||
           (remove_pcr_duplicates_at_bulk_level_ &&
            current_min_mapping.IsSamePosition(last_mapping)))) {
        ++dup_count;
        if (!is_bulk_data_ && remove_pcr_duplicates_at_bulk_level_) {
    // All mappings are merged. We only have to handle the case when the last
    // mapping is a duplicate.
    if (min_handle_index == temp_mapping_file_handles_.size()) {
      break;
    }

    ++num_total_mappings;

    // Output the current min mapping if it is not a duplicate.
    const MappingRecord &current_min_mapping =
        temp_mapping_file_handles_[min_handle_index].GetCurrentMapping();

    const bool is_first_iteration = num_total_mappings == 1;
    const bool current_mapping_is_duplicated_at_cell_level =
        !is_first_iteration && current_min_mapping == last_mapping;
    const bool current_mapping_is_duplicated_at_bulk_level =
        !is_first_iteration && deduplicate_at_bulk_level_for_single_cell_data &&
        current_min_mapping.IsSamePosition(last_mapping);
    const bool current_mapping_is_duplicated =
        last_rid == min_rid && (current_mapping_is_duplicated_at_cell_level ||
                                current_mapping_is_duplicated_at_bulk_level);

    if (remove_pcr_duplicates_ && current_mapping_is_duplicated) {
      ++num_last_mapping_dups;
      if (deduplicate_at_bulk_level_for_single_cell_data) {
        if (!temp_dups_for_bulk_level_dedup.empty() &&
            current_min_mapping == temp_dups_for_bulk_level_dedup.back()) {
            current_min_mapping.num_dups_ =
                temp_dups_for_bulk_level_dedup.back().num_dups_ + 1;
          // Merge if their barcodes are the same. Be careful of "==" here!
          temp_dups_for_bulk_level_dedup.back() = current_min_mapping;
          temp_dups_for_bulk_level_dedup.back().num_dups_ += 1;
        } else {
          temp_dups_for_bulk_level_dedup.push_back(current_min_mapping);
          temp_dups_for_bulk_level_dedup.back().num_dups_ = 1;
        }
      }
    } else {
        if (dup_count > 0) {
          if (!is_bulk_data_ && remove_pcr_duplicates_at_bulk_level_ &&
              temp_dups_for_bulk_level_dedup.size() > 0) {
            // Find the best barcode, break ties first by the number of the
            // barcodes in the dups, then by the barcode abundance.
            last_mapping = temp_dups_for_bulk_level_dedup[0];
            khiter_t barcode_whitelist_lookup_table_iterator =
                kh_get(k64_seq, barcode_whitelist_lookup_table_,
                       last_mapping.GetBarcode());
            double last_mapping_barcode_abundance =
                kh_value(barcode_whitelist_lookup_table_,
                         barcode_whitelist_lookup_table_iterator) /
                (double)num_sample_barcodes_;
            for (uint32_t bulk_dup_i = 1;
                 bulk_dup_i < temp_dups_for_bulk_level_dedup.size();
                 ++bulk_dup_i) {
              barcode_whitelist_lookup_table_iterator = kh_get(
                  k64_seq, barcode_whitelist_lookup_table_,
                  temp_dups_for_bulk_level_dedup[bulk_dup_i].GetBarcode());
              double current_mapping_barcode_abundance =
                  kh_value(barcode_whitelist_lookup_table_,
                           barcode_whitelist_lookup_table_iterator) /
                  (double)num_sample_barcodes_;
              if (temp_dups_for_bulk_level_dedup[bulk_dup_i].num_dups_ >
                      last_mapping.num_dups_ ||
                  (temp_dups_for_bulk_level_dedup[bulk_dup_i].num_dups_ ==
                       last_mapping.num_dups_ &&
                   current_mapping_barcode_abundance >
                       last_mapping_barcode_abundance)) {
                last_mapping = temp_dups_for_bulk_level_dedup[bulk_dup_i];
                last_mapping_barcode_abundance =
                    current_mapping_barcode_abundance;
              }
            }
      if (!is_first_iteration) {
        if (deduplicate_at_bulk_level_for_single_cell_data) {
          size_t best_mapping_index = FindBestMappingIndexFromDuplicates(
              temp_dups_for_bulk_level_dedup);
          last_mapping = temp_dups_for_bulk_level_dedup[best_mapping_index];

          temp_dups_for_bulk_level_dedup.clear();
        }

        if (last_mapping.mapq_ >= mapq_threshold_) {
            // if (allocate_multi_mappings_ || (only_output_unique_mappings_ &&
            // last_mapping.is_unique == 1)) {
            last_mapping.num_dups_ = std::min(
                (uint32_t)std::numeric_limits<uint8_t>::max(), dup_count);
          last_mapping.num_dups_ =
              std::min((uint32_t)std::numeric_limits<uint8_t>::max(),
                       num_last_mapping_dups);
          if (Tn5_shift_) {
            last_mapping.Tn5Shift();
          }
          mapping_writer_.AppendMapping(last_rid, reference, last_mapping);
          ++num_mappings_passing_filters;
            //}
        }

        if (last_mapping.is_unique_ == 1) {
          ++num_uni_mappings;
        } else {
          ++num_multi_mappings;
        }
      }

      last_mapping = current_min_mapping;
      last_rid = min_rid;
        dup_count = 1;
        if (remove_pcr_duplicates_at_bulk_level_) {
      num_last_mapping_dups = 1;

      if (deduplicate_at_bulk_level_for_single_cell_data) {
        temp_dups_for_bulk_level_dedup.push_back(current_min_mapping);
        temp_dups_for_bulk_level_dedup.back().num_dups_ = 1;
      }
    }
      temp_mapping_file_handles_[min_handle_index].Next(
          num_reference_sequences);
    }

    // Check if all are merged.
    all_merged = true;
    for (size_t hi = 0; hi < temp_mapping_file_handles_.size(); ++hi) {
      if (!temp_mapping_file_handles_[hi].all_loaded) {
        all_merged = false;
      }
    }
    temp_mapping_file_handles_[min_handle_index].Next(num_reference_sequences);
  }

  if (last_mapping.mapq_ >= mapq_threshold_) {
    if (!is_bulk_data_ && remove_pcr_duplicates_at_bulk_level_ &&
        temp_dups_for_bulk_level_dedup.size() > 0) {
      // Find the best barcode, break ties first by the number of the barcodes
      // in the dups, then by the barcode abundance.
      last_mapping = temp_dups_for_bulk_level_dedup[0];
      khiter_t barcode_whitelist_lookup_table_iterator = kh_get(
          k64_seq, barcode_whitelist_lookup_table_, last_mapping.GetBarcode());
      double last_mapping_barcode_abundance =
          kh_value(barcode_whitelist_lookup_table_,
                   barcode_whitelist_lookup_table_iterator) /
          (double)num_sample_barcodes_;
      for (uint32_t bulk_dup_i = 1;
           bulk_dup_i < temp_dups_for_bulk_level_dedup.size(); ++bulk_dup_i) {
        barcode_whitelist_lookup_table_iterator =
            kh_get(k64_seq, barcode_whitelist_lookup_table_,
                   temp_dups_for_bulk_level_dedup[bulk_dup_i].GetBarcode());
        double current_mapping_barcode_abundance =
            kh_value(barcode_whitelist_lookup_table_,
                     barcode_whitelist_lookup_table_iterator) /
            (double)num_sample_barcodes_;
        if (temp_dups_for_bulk_level_dedup[bulk_dup_i].num_dups_ >
                last_mapping.num_dups_ ||
            (temp_dups_for_bulk_level_dedup[bulk_dup_i].num_dups_ ==
                 last_mapping.num_dups_ &&
             current_mapping_barcode_abundance >
                 last_mapping_barcode_abundance)) {
          last_mapping = temp_dups_for_bulk_level_dedup[bulk_dup_i];
          last_mapping_barcode_abundance = current_mapping_barcode_abundance;
        }
      }
    if (deduplicate_at_bulk_level_for_single_cell_data) {
      size_t best_mapping_index =
          FindBestMappingIndexFromDuplicates(temp_dups_for_bulk_level_dedup);
      last_mapping = temp_dups_for_bulk_level_dedup[best_mapping_index];

      temp_dups_for_bulk_level_dedup.clear();
    }
    // if (allocate_multi_mappings_ || (only_output_unique_mappings_ &&
    // last_mapping.is_unique == 1)) {
    last_mapping.num_dups_ =
        std::min((uint32_t)std::numeric_limits<uint8_t>::max(), dup_count);

    last_mapping.num_dups_ = std::min(
        (uint32_t)std::numeric_limits<uint8_t>::max(), num_last_mapping_dups);
    if (Tn5_shift_) {
      last_mapping.Tn5Shift();
    }
    mapping_writer_.AppendMapping(last_rid, reference, last_mapping);
    ++num_mappings_passing_filters;
    //}
  }

  if (last_mapping.is_unique_ == 1) {
@@ -1258,20 +1239,8 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
                mappings_on_diff_ref_seqs_for_diff_threads_for_saving);
            if (low_memory_mode_ &&
                num_mappings_in_mem > max_num_mappings_in_mem) {
              TempMappingFileHandle<MappingRecord> temp_mapping_file_handle;
              temp_mapping_file_handle.file_path =
                  mapping_output_file_path_ + ".temp" +
                  std::to_string(temp_mapping_file_handles_.size());
              temp_mapping_file_handles_.emplace_back(temp_mapping_file_handle);
              mapping_processor.SortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs_);
              mapping_writer_.OutputTempMapping(
                  temp_mapping_file_handle.file_path, num_reference_sequences,
                  mappings_on_diff_ref_seqs_);
              OutputTempMappings(num_reference_sequences, mapping_processor);
              num_mappings_in_mem = 0;
              for (uint32_t i = 0; i < num_reference_sequences; ++i) {
                mappings_on_diff_ref_seqs_[i].clear();
              }
            }
          }  // end of omp task to handle output
        }    // end of while num_loaded_pairs
@@ -1373,6 +1342,28 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
  std::cerr << "Total time: " << GetRealTime() - real_start_time << "s.\n";
}

template <typename MappingRecord>
void Chromap<MappingRecord>::OutputTempMappings(
    uint32_t num_reference_sequences,
    const MappingProcessor<MappingRecord> &mapping_processor) {
  TempMappingFileHandle<MappingRecord> temp_mapping_file_handle;
  temp_mapping_file_handle.file_path =
      mapping_output_file_path_ + ".temp" +
      std::to_string(temp_mapping_file_handles_.size());
  temp_mapping_file_handles_.emplace_back(temp_mapping_file_handle);

  mapping_processor.SortOutputMappings(num_reference_sequences,
                                       mappings_on_diff_ref_seqs_);

  mapping_writer_.OutputTempMapping(temp_mapping_file_handle.file_path,
                                    num_reference_sequences,
                                    mappings_on_diff_ref_seqs_);

  for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    mappings_on_diff_ref_seqs_[i].clear();
  }
}

template <typename MappingRecord>
void Chromap<MappingRecord>::OutputMappingsInVector(
    uint8_t mapq_threshold, uint32_t num_reference_sequences,
+10 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@
#include "sequence_batch.h"
#include "temp_mapping.h"

#define CHROMAP_VERSION "0.1.6-r309"
#define CHROMAP_VERSION "0.1.6-r310"

namespace chromap {

@@ -312,6 +312,10 @@ class Chromap {
                            int *mapping_end_position);
  int GetLongestMatchLength(const char *pattern, const char *text,
                            const int read_length);

  size_t FindBestMappingIndexFromDuplicates(
      const std::vector<MappingRecord> &duplicates);

  void PostProcessingInLowMemory(
      uint32_t num_mappings_in_mem, uint32_t num_reference_sequences,
      const SequenceBatch &reference,
@@ -379,6 +383,11 @@ class Chromap {
                        uint64_t &num_corrected_barcode);

  void BuildAugmentedTreeForPeaks(uint32_t ref_id);

  void OutputTempMappings(
      uint32_t num_reference_sequences,
      const MappingProcessor<MappingRecord> &mapping_processor);

  void OutputMappingsInVector(
      uint8_t mapq_threshold, uint32_t num_reference_sequences,
      const SequenceBatch &reference,
+13 −10
Original line number Diff line number Diff line
@@ -22,7 +22,6 @@ template <typename MappingRecord>
struct TempMappingFileHandle {
  std::string file_path;
  FILE* file;
  bool all_loaded;
  uint32_t num_mappings;
  uint32_t block_size;
  uint32_t current_rid;
@@ -32,14 +31,22 @@ struct TempMappingFileHandle {
  // This vector only keep mappings on the same ref seq.
  std::vector<MappingRecord> mappings;

  inline void InitializeTempMappingLoading(uint32_t num_reference_sequences) {
  inline const MappingRecord& GetCurrentMapping() const {
    return mappings[current_mapping_index];
  }

  inline bool HasMappings() const { return num_mappings != 0; }

  inline void InitializeTempMappingLoading(uint32_t temp_mapping_block_size) {
    file = fopen(file_path.c_str(), "rb");
    assert(file != NULL);
    all_loaded = false;
    num_mappings = 0;
    block_size = temp_mapping_block_size;
    current_rid = 0;
    current_mapping_index = 0;
    fread(&num_mappings_on_current_rid, sizeof(size_t), 1, file);
    mappings.resize(block_size);
    num_loaded_mappings_on_current_rid = 0;
    mappings.resize(block_size);
    // std::cerr << "Block size: " << block_size << ", initialize temp file " <<
    // file_path << "\n";
  }
@@ -78,11 +85,11 @@ struct TempMappingFileHandle {
          // std::cerr << "Load size " << num_mappings_on_current_rid << "\n";
          num_loaded_mappings_on_current_rid = 0;
        } else {
          all_loaded = true;
          break;
        }
      }
    }

    current_mapping_index = 0;
  }

@@ -129,7 +136,6 @@ inline void TempMappingFileHandle<PAFMapping>::LoadTempMappingBlock(
        // std::cerr << "Load size " << num_mappings_on_current_rid << "\n";
        num_loaded_mappings_on_current_rid = 0;
      } else {
        all_loaded = true;
        break;
      }
    }
@@ -172,7 +178,6 @@ inline void TempMappingFileHandle<PairedPAFMapping>::LoadTempMappingBlock(
        // std::cerr << "Load size " << num_mappings_on_current_rid << "\n";
        num_loaded_mappings_on_current_rid = 0;
      } else {
        all_loaded = true;
        break;
      }
    }
@@ -215,7 +220,6 @@ inline void TempMappingFileHandle<SAMMapping>::LoadTempMappingBlock(
        // std::cerr << "Load size " << num_mappings_on_current_rid << "\n";
        num_loaded_mappings_on_current_rid = 0;
      } else {
        all_loaded = true;
        break;
      }
    }
@@ -258,7 +262,6 @@ inline void TempMappingFileHandle<PairsMapping>::LoadTempMappingBlock(
        // std::cerr << "Load size " << num_mappings_on_current_rid << "\n";
        num_loaded_mappings_on_current_rid = 0;
      } else {
        all_loaded = true;
        break;
      }
    }