Commit 63f985cf authored by Haowen Zhang's avatar Haowen Zhang
Browse files

Move mapping output functions to MappingWriter.

parent 53ecee88
Loading
Loading
Loading
Loading
+36 −310
Original line number Diff line number Diff line
@@ -514,235 +514,6 @@ void Chromap<MappingRecord>::UpdateBarcodeAbundance(
            << " in " << GetRealTime() - real_start_time << "s.\n";
}

template <typename MappingRecord>
size_t Chromap<MappingRecord>::FindBestMappingIndexFromDuplicates(
    const std::vector<MappingRecord> &duplicates) {
  // Find the best barcode, break ties first by the number of the
  // barcodes in the dups, then by the barcode abundance.
  size_t best_mapping_index = 0;

  khiter_t barcode_whitelist_lookup_table_iterator =
      kh_get(k64_seq, barcode_whitelist_lookup_table_,
             duplicates[best_mapping_index].GetBarcode());

  double best_mapping_barcode_abundance =
      kh_value(barcode_whitelist_lookup_table_,
               barcode_whitelist_lookup_table_iterator) /
      (double)num_sample_barcodes_;

  for (size_t bulk_dup_i = 1; bulk_dup_i < duplicates.size(); ++bulk_dup_i) {
    barcode_whitelist_lookup_table_iterator =
        kh_get(k64_seq, barcode_whitelist_lookup_table_,
               duplicates[bulk_dup_i].GetBarcode());

    const double current_mapping_barcode_abundance =
        kh_value(barcode_whitelist_lookup_table_,
                 barcode_whitelist_lookup_table_iterator) /
        (double)num_sample_barcodes_;

    const bool same_num_dups_with_higer_barcode_abundance =
        duplicates[bulk_dup_i].num_dups_ ==
            duplicates[best_mapping_index].num_dups_ &&
        current_mapping_barcode_abundance > best_mapping_barcode_abundance;

    if (duplicates[bulk_dup_i].num_dups_ >
            duplicates[best_mapping_index].num_dups_ ||
        same_num_dups_with_higer_barcode_abundance) {
      best_mapping_index = bulk_dup_i;
      best_mapping_barcode_abundance = current_mapping_barcode_abundance;
    }
  }
  return best_mapping_index;
}

template <typename MappingRecord>
void Chromap<MappingRecord>::ProcessAndOutputMappingsInLowMemory(
    uint32_t num_mappings_in_mem, uint32_t num_reference_sequences,
    const SequenceBatch &reference,
    const MappingProcessor<MappingRecord> &mapping_processor,
    std::vector<TempMappingFileHandle<MappingRecord>>
        &temp_mapping_file_handles,
    MappingWriter<MappingRecord> &mapping_writer) {
  if (temp_mapping_file_handles.size() == 0) {
    return;
  }

  double sort_and_dedupe_start_time = GetRealTime();

  // Calculate block size and initialize
  uint64_t max_mem_size = 10 * ((uint64_t)1 << 30);
  if (mapping_output_format_ == MAPPINGFORMAT_SAM ||
      mapping_output_format_ == MAPPINGFORMAT_PAIRS ||
      mapping_output_format_ == MAPPINGFORMAT_PAF) {
    max_mem_size = (uint64_t)1 << 30;
  }
  for (size_t hi = 0; hi < temp_mapping_file_handles.size(); ++hi) {
    const uint32_t temp_mapping_block_size =
        max_mem_size / temp_mapping_file_handles.size() / sizeof(MappingRecord);

    temp_mapping_file_handles[hi].InitializeTempMappingLoading(
        temp_mapping_block_size);
    temp_mapping_file_handles[hi].LoadTempMappingBlock(num_reference_sequences);
  }

  // Merge and dedupe.
  uint32_t last_rid = std::numeric_limits<uint32_t>::max();
  MappingRecord last_mapping = MappingRecord();
  uint32_t num_last_mapping_dups = 0;
  uint64_t num_uni_mappings = 0;
  uint64_t num_multi_mappings = 0;
  uint64_t num_mappings_passing_filters = 0;
  uint64_t num_total_mappings = 0;
  std::vector<MappingRecord> temp_dups_for_bulk_level_dedup;
  temp_dups_for_bulk_level_dedup.reserve(255);

  const bool deduplicate_at_bulk_level_for_single_cell_data =
      remove_pcr_duplicates_ && !is_bulk_data_ &&
      remove_pcr_duplicates_at_bulk_level_;

  while (true) {
    // Merge, dedupe and output.
    // Find min first (sorted by rid and then barcode and then positions).
    size_t min_handle_index = temp_mapping_file_handles.size();
    uint32_t min_rid = std::numeric_limits<uint32_t>::max();

    for (size_t hi = 0; hi < temp_mapping_file_handles.size(); ++hi) {
      const TempMappingFileHandle<MappingRecord> &current_handle =
          temp_mapping_file_handles[hi];
      if (current_handle.HasMappings()) {
        const bool rid_is_smaller = current_handle.current_rid < min_rid;
        const bool same_rid_smaller_mapping =
            current_handle.current_rid == min_rid &&
            current_handle.GetCurrentMapping() <
                temp_mapping_file_handles[min_handle_index].GetCurrentMapping();

        if (rid_is_smaller || same_rid_smaller_mapping) {
          min_handle_index = hi;
          min_rid = current_handle.current_rid;
        }
      }
    }

    // All mappings are merged. We only have to handle the case when the last
    // mapping is a duplicate.
    if (min_handle_index == temp_mapping_file_handles.size()) {
      break;
    }

    ++num_total_mappings;

    // Output the current min mapping if it is not a duplicate.
    const MappingRecord &current_min_mapping =
        temp_mapping_file_handles[min_handle_index].GetCurrentMapping();

    const bool is_first_iteration = num_total_mappings == 1;
    const bool current_mapping_is_duplicated_at_cell_level =
        !is_first_iteration && current_min_mapping == last_mapping;
    const bool current_mapping_is_duplicated_at_bulk_level =
        !is_first_iteration && deduplicate_at_bulk_level_for_single_cell_data &&
        current_min_mapping.IsSamePosition(last_mapping);
    const bool current_mapping_is_duplicated =
        last_rid == min_rid && (current_mapping_is_duplicated_at_cell_level ||
                                current_mapping_is_duplicated_at_bulk_level);

    if (remove_pcr_duplicates_ && current_mapping_is_duplicated) {
      ++num_last_mapping_dups;
      if (deduplicate_at_bulk_level_for_single_cell_data) {
        if (!temp_dups_for_bulk_level_dedup.empty() &&
            current_min_mapping == temp_dups_for_bulk_level_dedup.back()) {
          // Merge if their barcodes are the same. Be careful of "==" here!
          temp_dups_for_bulk_level_dedup.back() = current_min_mapping;
          temp_dups_for_bulk_level_dedup.back().num_dups_ += 1;
        } else {
          temp_dups_for_bulk_level_dedup.push_back(current_min_mapping);
          temp_dups_for_bulk_level_dedup.back().num_dups_ = 1;
        }
      }
    } else {
      if (!is_first_iteration) {
        if (deduplicate_at_bulk_level_for_single_cell_data) {
          size_t best_mapping_index = FindBestMappingIndexFromDuplicates(
              temp_dups_for_bulk_level_dedup);
          last_mapping = temp_dups_for_bulk_level_dedup[best_mapping_index];

          temp_dups_for_bulk_level_dedup.clear();
        }

        if (last_mapping.mapq_ >= mapq_threshold_) {
          last_mapping.num_dups_ =
              std::min((uint32_t)std::numeric_limits<uint8_t>::max(),
                       num_last_mapping_dups);
          if (Tn5_shift_) {
            last_mapping.Tn5Shift();
          }
          mapping_writer.AppendMapping(last_rid, reference, last_mapping);
          ++num_mappings_passing_filters;
        }

        if (last_mapping.is_unique_ == 1) {
          ++num_uni_mappings;
        } else {
          ++num_multi_mappings;
        }
      }

      last_mapping = current_min_mapping;
      last_rid = min_rid;
      num_last_mapping_dups = 1;

      if (deduplicate_at_bulk_level_for_single_cell_data) {
        temp_dups_for_bulk_level_dedup.push_back(current_min_mapping);
        temp_dups_for_bulk_level_dedup.back().num_dups_ = 1;
      }
    }

    temp_mapping_file_handles[min_handle_index].Next(num_reference_sequences);
  }

  if (last_mapping.mapq_ >= mapq_threshold_) {
    if (deduplicate_at_bulk_level_for_single_cell_data) {
      size_t best_mapping_index =
          FindBestMappingIndexFromDuplicates(temp_dups_for_bulk_level_dedup);
      last_mapping = temp_dups_for_bulk_level_dedup[best_mapping_index];

      temp_dups_for_bulk_level_dedup.clear();
    }

    last_mapping.num_dups_ = std::min(
        (uint32_t)std::numeric_limits<uint8_t>::max(), num_last_mapping_dups);
    if (Tn5_shift_) {
      last_mapping.Tn5Shift();
    }
    mapping_writer.AppendMapping(last_rid, reference, last_mapping);
    ++num_mappings_passing_filters;
  }

  if (last_mapping.is_unique_ == 1) {
    ++num_uni_mappings;
  } else {
    ++num_multi_mappings;
  }

  // Delete temp files.
  for (size_t hi = 0; hi < temp_mapping_file_handles.size(); ++hi) {
    temp_mapping_file_handles[hi].FinalizeTempMappingLoading();
    remove(temp_mapping_file_handles[hi].file_path.c_str());
  }

  if (remove_pcr_duplicates_) {
    std::cerr << "Sorted, deduped and outputed mappings in "
              << GetRealTime() - sort_and_dedupe_start_time << "s.\n";
  } else {
    std::cerr << "Sorted and outputed mappings in "
              << GetRealTime() - sort_and_dedupe_start_time << "s.\n";
  }
  std::cerr << "# uni-mappings: " << num_uni_mappings
            << ", # multi-mappings: " << num_multi_mappings
            << ", total: " << num_uni_mappings + num_multi_mappings << ".\n";
  std::cerr << "Number of output mappings (passed filters): "
            << num_mappings_passing_filters << "\n";
}

template <typename MappingRecord>
void Chromap<MappingRecord>::GenerateCustomizedRidRank(
    const std::string &rid_order_path, const SequenceBatch &reference,
@@ -895,6 +666,8 @@ void Chromap<MappingRecord>::MapPairedEndReads() {

  MappingWriter<MappingRecord> mapping_writer(
      mapping_output_file_path_, mapping_output_format_, barcode_length_,
      mapq_threshold_, Tn5_shift_, remove_pcr_duplicates_,
      remove_pcr_duplicates_at_bulk_level_, is_bulk_data_,
      barcode_translate_table_path_.empty() ? nullptr
                                            : &barcode_translate_table_path_,
      mapping_output_format_ == MAPPINGFORMAT_PAIRS ? &pairs_custom_rid_rank_
@@ -1264,9 +1037,12 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
                mappings_on_diff_ref_seqs);
            if (low_memory_mode_ &&
                num_mappings_in_mem > max_num_mappings_in_mem) {
              OutputTempMappings(num_reference_sequences,
                                 mappings_on_diff_ref_seqs, mapping_processor,
                                 temp_mapping_file_handles, mapping_writer);
              mapping_processor.SortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs);

              mapping_writer.OutputTempMappings(num_reference_sequences,
                                                mappings_on_diff_ref_seqs,
                                                temp_mapping_file_handles);
              num_mappings_in_mem = 0;
            }
          }  // end of omp task to handle output
@@ -1306,15 +1082,18 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
    // First, process the remaining mappings in the memory and save them on
    // disk.
    if (num_mappings_in_mem > 0) {
      OutputTempMappings(num_reference_sequences, mappings_on_diff_ref_seqs,
                         mapping_processor, temp_mapping_file_handles,
                         mapping_writer);
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);

      mapping_writer.OutputTempMappings(num_reference_sequences,
                                        mappings_on_diff_ref_seqs,
                                        temp_mapping_file_handles);
      num_mappings_in_mem = 0;
    }

    ProcessAndOutputMappingsInLowMemory(
    mapping_writer.ProcessAndOutputMappingsInLowMemory(
        num_mappings_in_mem, num_reference_sequences, reference,
        mapping_processor, temp_mapping_file_handles, mapping_writer);
        barcode_whitelist_lookup_table_, temp_mapping_file_handles);
  } else {
    // OutputMappingStatistics(num_reference_sequences,
    // mappings_on_diff_ref_seqs, mappings_on_diff_ref_seqs);
@@ -1348,8 +1127,8 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
    }
    OutputMappings(num_reference_sequences, reference,
                   mappings_on_diff_ref_seqs, mapping_writer);
    mapping_writer.OutputMappings(num_reference_sequences, reference,
                                  mappings_on_diff_ref_seqs);

    // Temporarily disable feature matrix output. Do not delete the following
    // commented code.
@@ -1377,67 +1156,6 @@ void Chromap<MappingRecord>::MapPairedEndReads() {
  std::cerr << "Total time: " << GetRealTime() - real_start_time << "s.\n";
}

template <typename MappingRecord>
void Chromap<MappingRecord>::OutputTempMappings(
    uint32_t num_reference_sequences,
    std::vector<std::vector<MappingRecord>> &mappings_on_diff_ref_seqs,
    const MappingProcessor<MappingRecord> &mapping_processor,
    std::vector<TempMappingFileHandle<MappingRecord>>
        &temp_mapping_file_handles,
    MappingWriter<MappingRecord> &mapping_writer) {
  TempMappingFileHandle<MappingRecord> temp_mapping_file_handle;
  temp_mapping_file_handle.file_path =
      mapping_output_file_path_ + ".temp" +
      std::to_string(temp_mapping_file_handles.size());
  temp_mapping_file_handles.emplace_back(temp_mapping_file_handle);

  mapping_processor.SortOutputMappings(num_reference_sequences,
                                       mappings_on_diff_ref_seqs);

  mapping_writer.OutputTempMapping(temp_mapping_file_handle.file_path,
                                   num_reference_sequences,
                                   mappings_on_diff_ref_seqs);

  for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    mappings_on_diff_ref_seqs[i].clear();
  }
}

template <typename MappingRecord>
void Chromap<MappingRecord>::OutputMappingsInVector(
    uint8_t mapq_threshold, uint32_t num_reference_sequences,
    const SequenceBatch &reference,
    const std::vector<std::vector<MappingRecord>> &mappings,
    MappingWriter<MappingRecord> &mapping_writer) {
  uint64_t num_mappings_passing_filters = 0;
  for (uint32_t ri = 0; ri < num_reference_sequences; ++ri) {
    for (auto it = mappings[ri].begin(); it != mappings[ri].end(); ++it) {
      uint8_t mapq = (it->mapq_);
      // uint8_t is_unique = (it->is_unique);
      if (mapq >= mapq_threshold) {
        // if (allocate_multi_mappings_ || (only_output_unique_mappings_ &&
        // is_unique == 1)) {
        mapping_writer.AppendMapping(ri, reference, *it);
        ++num_mappings_passing_filters;
        //}
      }
    }
  }
  std::cerr << "Number of output mappings (passed filters): "
            << num_mappings_passing_filters << "\n";
}

template <typename MappingRecord>
void Chromap<MappingRecord>::OutputMappings(
    uint32_t num_reference_sequences, const SequenceBatch &reference,
    const std::vector<std::vector<MappingRecord>> &mappings,
    MappingWriter<MappingRecord> &mapping_writer) {
  // if (only_output_unique_mappings_ && mapq_threshold_ < 4)
  //  mapq_threshold_ = 4;
  OutputMappingsInVector(mapq_threshold_, num_reference_sequences, reference,
                         mappings, mapping_writer);
}

template <typename MappingRecord>
void Chromap<MappingRecord>::MapSingleEndReads() {
  double real_start_time = GetRealTime();
@@ -1501,6 +1219,8 @@ void Chromap<MappingRecord>::MapSingleEndReads() {

  MappingWriter<MappingRecord> mapping_writer(
      mapping_output_file_path_, mapping_output_format_, barcode_length_,
      mapq_threshold_, Tn5_shift_, remove_pcr_duplicates_,
      remove_pcr_duplicates_at_bulk_level_, is_bulk_data_,
      barcode_translate_table_path_.empty() ? nullptr
                                            : &barcode_translate_table_path_,
      /*custom_rid_rank=*/nullptr);
@@ -1710,9 +1430,12 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
                mappings_on_diff_ref_seqs);
            if (low_memory_mode_ &&
                num_mappings_in_mem > max_num_mappings_in_mem) {
              OutputTempMappings(num_reference_sequences,
                                 mappings_on_diff_ref_seqs, mapping_processor,
                                 temp_mapping_file_handles, mapping_writer);
              mapping_processor.SortOutputMappings(num_reference_sequences,
                                                   mappings_on_diff_ref_seqs);

              mapping_writer.OutputTempMappings(num_reference_sequences,
                                                mappings_on_diff_ref_seqs,
                                                temp_mapping_file_handles);
              num_mappings_in_mem = 0;
            }
          }
@@ -1751,15 +1474,18 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
    // First, process the remaining mappings in the memory and save them on
    // disk.
    if (num_mappings_in_mem > 0) {
      OutputTempMappings(num_reference_sequences, mappings_on_diff_ref_seqs,
                         mapping_processor, temp_mapping_file_handles,
                         mapping_writer);
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);

      mapping_writer.OutputTempMappings(num_reference_sequences,
                                        mappings_on_diff_ref_seqs,
                                        temp_mapping_file_handles);
      num_mappings_in_mem = 0;
    }

    ProcessAndOutputMappingsInLowMemory(
    mapping_writer.ProcessAndOutputMappingsInLowMemory(
        num_mappings_in_mem, num_reference_sequences, reference,
        mapping_processor, temp_mapping_file_handles, mapping_writer);
        barcode_whitelist_lookup_table_, temp_mapping_file_handles);
  } else {
    if (Tn5_shift_) {
      mapping_processor.ApplyTn5ShiftOnMappings(num_reference_sequences,
@@ -1791,8 +1517,8 @@ void Chromap<MappingRecord>::MapSingleEndReads() {
      mapping_processor.SortOutputMappings(num_reference_sequences,
                                           mappings_on_diff_ref_seqs);
    }
    OutputMappings(num_reference_sequences, reference,
                   mappings_on_diff_ref_seqs, mapping_writer);
    mapping_writer.OutputMappings(num_reference_sequences, reference,
                                  mappings_on_diff_ref_seqs);
  }

  reference.FinalizeLoading();
+0 −31
Original line number Diff line number Diff line
@@ -173,17 +173,6 @@ class Chromap {
                        uint64_t &num_barcode_in_whitelist,
                        uint64_t &num_corrected_barcode);

  size_t FindBestMappingIndexFromDuplicates(
      const std::vector<MappingRecord> &duplicates);

  void ProcessAndOutputMappingsInLowMemory(
      uint32_t num_mappings_in_mem, uint32_t num_reference_sequences,
      const SequenceBatch &reference,
      const MappingProcessor<MappingRecord> &mapping_processor,
      std::vector<TempMappingFileHandle<MappingRecord> >
          &temp_mapping_file_handles,
      MappingWriter<MappingRecord> &mapping_writer);

  uint32_t MoveMappingsInBuffersToMappingContainer(
      uint32_t num_reference_sequences,
      std::vector<std::vector<std::vector<MappingRecord> > >
@@ -199,25 +188,6 @@ class Chromap {
      const std::vector<std::vector<MappingRecord> > &uni_mappings,
      const std::vector<std::vector<MappingRecord> > &multi_mappings);

  void OutputTempMappings(
      uint32_t num_reference_sequences,
      std::vector<std::vector<MappingRecord> > &mappings_on_diff_ref_seqs,
      const MappingProcessor<MappingRecord> &mapping_processor,
      std::vector<TempMappingFileHandle<MappingRecord> >
          &temp_mapping_file_handles,
      MappingWriter<MappingRecord> &mapping_writer);

  void OutputMappingsInVector(
      uint8_t mapq_threshold, uint32_t num_reference_sequences,
      const SequenceBatch &reference,
      const std::vector<std::vector<MappingRecord> > &mappings,
      MappingWriter<MappingRecord> &mapping_writer);

  void OutputMappings(uint32_t num_reference_sequences,
                      const SequenceBatch &reference,
                      const std::vector<std::vector<MappingRecord> > &mappings,
                      MappingWriter<MappingRecord> &mapping_writer);

  void GenerateCustomizedRidRank(const std::string &rid_order_path,
                                 const SequenceBatch &reference,
                                 std::vector<int> &rid_rank);
@@ -289,7 +259,6 @@ class Chromap {
  std::vector<int> custom_rid_rank_;
  std::vector<int> pairs_custom_rid_rank_;
  std::string barcode_translate_table_path_;
  // khash_t(k32_set)* barcode_whitelist_lookup_table_;
  khash_t(k64_seq) * barcode_whitelist_lookup_table_;
  // For identical read dedupe
  int allocated_barcode_lookup_table_size_ = (1 << 10);
+339 −17

File changed.

Preview size limit exceeded, changes collapsed.