Commit 083a20c4 authored by oma219's avatar oma219 Committed by Li Song
Browse files

added debug code and options/added cache size, cache param, all reads...

added debug code and options/added cache size, cache param, all reads options/parallelized cache update
parent 9b016207
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@ struct Candidate {

  inline uint32_t GetReferenceSequencePosition() const { return position; }

  inline uint8_t GetCount() { return count; }

  inline bool operator<(const Candidate &c) const {
    if (count > c.count) {
      return true;
+130 −45
Original line number Diff line number Diff line
@@ -328,7 +328,10 @@ void Chromap::MapSingleEndReads() {
          }  // end of openmp loading task
          uint32_t history_update_threshold =
          mm_to_candidates_cache.GetUpdateThreshold(num_loaded_reads,
                                                        num_reads_, false);
                                                    num_reads_, 
                                                    false,
                                                    false,
                                                    0.01);
          // int grain_size = 10000;
//#pragma omp taskloop grainsize(grain_size) //num_tasks(num_threads_* 50)
#pragma omp taskloop num_tasks( \
@@ -584,6 +587,15 @@ void Chromap::MapPairedEndReads() {
  reference.InitializeLoading(mapping_parameters_.reference_file_path);
  reference.LoadAllSequences();
  uint32_t num_reference_sequences = reference.GetNumSequences();
  
  // Debugging Info (printing out reference information)
  if (mapping_parameters_.debug_cache) {
    for (size_t i = 0; i < num_reference_sequences; i++){
      std::cout << "[DEBUG][INDEX] seq_i = " << i 
                << " , seq_i_name = " << reference.GetSequenceNameAt(i) << std::endl;
    }
  }
  
  if (mapping_parameters_.custom_rid_order_file_path.length() > 0) {
    GenerateCustomRidRanks(mapping_parameters_.custom_rid_order_file_path,
                           num_reference_sequences, reference,
@@ -614,26 +626,33 @@ void Chromap::MapPairedEndReads() {
  SequenceBatch barcode_batch_for_loading(read_batch_size_,
                                          barcode_effective_range_);

  // Check cache-related parameters
  std::cerr << "Cache Size: " << mapping_parameters_.cache_size << std::endl;
  std::cerr << "Use All Reads for Cache: " << mapping_parameters_.use_all_reads << std::endl;
  std::cerr << "Cache Update Param: " << mapping_parameters_.cache_update_param << std::endl;
  
  std::vector<uint64_t> seeds_for_batch(500000, 0);

  // Initialize cache
  mm_cache mm_to_candidates_cache(2000003);
  mm_cache mm_to_candidates_cache(mapping_parameters_.cache_size);
  mm_to_candidates_cache.SetKmerLength(kmer_size);

  struct _mm_history *mm_history1 = new struct _mm_history[read_batch_size_];
  struct _mm_history *mm_history2 = new struct _mm_history[read_batch_size_];
  
  // The explanation for read_map_summary is in the single-end mapping function
  uint8_t *read_map_summary = NULL ;
  if (!mapping_parameters_.summary_metadata_file_path.empty()) {
    read_map_summary = new uint8_t[read_batch_size_];
    memset(read_map_summary, 1, sizeof(*read_map_summary)*read_batch_size_);
  }


  std::vector<std::vector<MappingRecord>> mappings_on_diff_ref_seqs;
  
  // Initialize mapping container
  mappings_on_diff_ref_seqs.reserve(num_reference_sequences);
  for (uint32_t i = 0; i < num_reference_sequences; ++i) {
    mappings_on_diff_ref_seqs.emplace_back(std::vector<MappingRecord>());
  }

  std::vector<TempMappingFileHandle<MappingRecord>> temp_mapping_file_handles;

  // Preprocess barcodes for single cell data
@@ -763,10 +782,21 @@ void Chromap::MapPairedEndReads() {
          int grain_size = 5000;
          uint32_t history_update_threshold =
          mm_to_candidates_cache.GetUpdateThreshold(num_loaded_pairs,
                                                        num_reads_, true);
#pragma omp taskloop grainsize(grain_size)
                                                    num_reads_, 
                                                    true,
                                                    mapping_parameters_.use_all_reads,
                                                    mapping_parameters_.cache_update_param
                                                    );
          int cache_hits_for_batch = 0;

          if (mapping_parameters_.debug_cache) {
            std::cout << "[DEBUG][UPDATE] update_threshold = " << history_update_threshold << std::endl;
          }

#pragma omp taskloop grainsize(grain_size) reduction(+:cache_hits_for_batch)
          for (uint32_t pair_index = 0; pair_index < num_loaded_pairs;
               ++pair_index) {
            
            bool current_barcode_is_whitelisted = true;
            if (!mapping_parameters_.barcode_whitelist_file_path.empty()) {
              current_barcode_is_whitelisted = CorrectBarcodeAt(
@@ -774,6 +804,10 @@ void Chromap::MapPairedEndReads() {
                  thread_num_corrected_barcode);
            }

            // calculate seed value for each barcode to use later (below and summary update)
            size_t curr_seed_val = barcode_batch.GenerateSeedFromSequenceAt(pair_index, 0, barcode_length_);
            seeds_for_batch[pair_index] = curr_seed_val;

            if (current_barcode_is_whitelisted ||
                mapping_parameters_.output_mappings_not_in_whitelist) {
              read_batch1.PrepareNegativeSequenceAt(pair_index);
@@ -795,30 +829,57 @@ void Chromap::MapPairedEndReads() {
                  paired_end_mapping_metadata.mapping_metadata2_.minimizers_);

              if (paired_end_mapping_metadata.BothEndsHaveMinimizers()) {
                // Generate candidates
                if (mm_to_candidates_cache.Query(
                        paired_end_mapping_metadata.mapping_metadata1_,
                        read_batch1.GetSequenceLengthAt(pair_index)) == -1) {
                // declare temp local variable for cache result
                int cache_query_result1 = 0;
                int cache_query_result2 = 0;
                int cache_miss = 0;

                cache_query_result1 = mm_to_candidates_cache.Query(paired_end_mapping_metadata.mapping_metadata1_,
                                                                  read_batch1.GetSequenceLengthAt(pair_index));
                if (cache_query_result1 == -1) 
                {
                  candidate_processor.GenerateCandidates(
                      mapping_parameters_.error_threshold, index,
                      paired_end_mapping_metadata.mapping_metadata1_);
                      mapping_parameters_.error_threshold, 
                      index,
                      paired_end_mapping_metadata.mapping_metadata1_
                      );
                  ++cache_miss;
                }
                size_t current_num_candidates1 = paired_end_mapping_metadata.mapping_metadata1_.GetNumCandidates();

                size_t current_num_candidates1 =
                    paired_end_mapping_metadata.mapping_metadata1_
                        .GetNumCandidates();

                if (mm_to_candidates_cache.Query(
                        paired_end_mapping_metadata.mapping_metadata2_,
                        read_batch2.GetSequenceLengthAt(pair_index)) == -1) {
                cache_query_result2 = mm_to_candidates_cache.Query(paired_end_mapping_metadata.mapping_metadata2_,
                                                                  read_batch2.GetSequenceLengthAt(pair_index));
                if (cache_query_result2 == -1) 
                {
                  candidate_processor.GenerateCandidates(
                      mapping_parameters_.error_threshold, index,
                      paired_end_mapping_metadata.mapping_metadata2_);
                      mapping_parameters_.error_threshold, 
                      index,
                      paired_end_mapping_metadata.mapping_metadata2_
                      );
                  ++cache_miss;
                }
                size_t current_num_candidates2 = paired_end_mapping_metadata.mapping_metadata2_.GetNumCandidates();

                size_t current_num_candidates2 =
                    paired_end_mapping_metadata.mapping_metadata2_
                        .GetNumCandidates();
                // increment variable for cache_hits
                //bool curr_read_hit_cache = false;
                if (cache_query_result1 >= 0 || cache_query_result2 >= 0) {
                  cache_hits_for_batch++;
                  //curr_read_hit_cache = true;
                }

                // update the peak counting data-structure
                // if (output_peak_info && curr_read_hit_cache) {
                //   // calculate which map this barcode is in
                //   size_t map_id = curr_seed_val % num_locks_for_map;
                
                //   // grab lock for this map, and add to the K-MinHash for this particular barcode
                //   omp_set_lock(&map_locks[map_id]);
                //   auto it = barcode_peak_map[map_id].emplace(curr_seed_val, K_MinHash(k_for_minhash, mapping_parameters_.cache_size)).first;
                //   if (cache_query_result1 >= 0) {it->second.add(cache_query_result1);}
                //   if (cache_query_result2 >= 0) {it->second.add(cache_query_result2);}
                //   omp_unset_lock(&map_locks[map_id]);
                // }

                if (pair_index < history_update_threshold) {
                  mm_history1[pair_index].timestamp =
@@ -963,6 +1024,9 @@ void Chromap::MapPairedEndReads() {
                    if (paired_end_mapping_metadata.GetNumBestMappings() > 0) {
                      ++thread_num_mapped_reads;
                      ++thread_num_mapped_reads;

                      if (read_map_summary != NULL)
                        read_map_summary[pair_index] |= (cache_miss < 2 ? 2 : 0) ;
                    }
                  }
                }  // verify candidate
@@ -980,7 +1044,7 @@ void Chromap::MapPairedEndReads() {
          //    }
          //  }
          //}
#pragma omp taskwait
#pragma omp taskloop grainsize(grain_size)
          // Update cache
          for (uint32_t pair_index = 0; pair_index < history_update_threshold;
               ++pair_index) {
@@ -990,12 +1054,14 @@ void Chromap::MapPairedEndReads() {
                mm_history1[pair_index].minimizers,
                mm_history1[pair_index].positive_candidates,
                mm_history1[pair_index].negative_candidates,
                mm_history1[pair_index].repetitive_seed_length);
                mm_history1[pair_index].repetitive_seed_length,
                mapping_parameters_.debug_cache);
            mm_to_candidates_cache.Update(
                mm_history2[pair_index].minimizers,
                mm_history2[pair_index].positive_candidates,
                mm_history2[pair_index].negative_candidates,
                mm_history2[pair_index].repetitive_seed_length);
                mm_history2[pair_index].repetitive_seed_length,
                mapping_parameters_.debug_cache);

            if (mm_history1[pair_index].positive_candidates.size() > 50) {
              std::vector<Candidate>().swap(
@@ -1015,25 +1081,39 @@ void Chromap::MapPairedEndReads() {
            }
          }

#pragma omp taskwait
          if (!mapping_parameters_.summary_metadata_file_path.empty()) {
            // Update total read count 
            // Update total read count and number of cache hits
            if (mapping_parameters_.is_bulk_data) 
              mapping_writer.UpdateSummaryMetadata(0, SUMMARY_METADATA_TOTAL, 
            {
              mapping_writer.UpdateSummaryMetadata(0, 
                                                   SUMMARY_METADATA_TOTAL, 
                                                   num_loaded_pairs);
            else {
              uint32_t nonwhitelist_count = 0 ;
              mapping_writer.UpdateSummaryMetadata(0,
                                                   SUMMARY_METADATA_CACHEHIT, 
                                                   cache_hits_for_batch);
            }
            else 
            {
              for (uint32_t pair_index = 0; pair_index < num_loaded_pairs; ++pair_index) 
                if (read_map_summary[pair_index] & 1) {
              {
                uint64_t pair_seed = seeds_for_batch[pair_index];
                if (read_map_summary[pair_index] & 1) 
                {
                  mapping_writer.UpdateSummaryMetadata(
                      barcode_batch.GenerateSeedFromSequenceAt(pair_index, 0, barcode_length_), 
                      SUMMARY_METADATA_TOTAL, 1);
                } else {
                  ++nonwhitelist_count ;
                                            pair_seed, 
                                            SUMMARY_METADATA_TOTAL, 
                                            1);
                }
                if (read_map_summary[pair_index] & 2) 
                {
                  mapping_writer.UpdateSummaryMetadata( 
                                            pair_seed,
                                            SUMMARY_METADATA_CACHEHIT, 
                                            1);
                }
              }
              mapping_writer.UpdateSpeicalCategorySummaryMetadata(/*nonwhitelist*/0, 
                  SUMMARY_METADATA_TOTAL, nonwhitelist_count);
            }            
            
            memset(read_map_summary, 1, sizeof(*read_map_summary)*read_batch_size_);
          }

@@ -1048,6 +1128,10 @@ void Chromap::MapPairedEndReads() {
          barcode_batch_for_loading.SwapSequenceBatch(barcode_batch);
          mappings_on_diff_ref_seqs_for_diff_threads.swap(
              mappings_on_diff_ref_seqs_for_diff_threads_for_saving);

          // Reset for next batch
          std::fill(seeds_for_batch.begin(), seeds_for_batch.end(), 0);

#pragma omp task
          {
            // Handle output
@@ -1122,7 +1206,8 @@ void Chromap::MapPairedEndReads() {
    mapping_writer.ProcessAndOutputMappingsInLowMemory(
        num_mappings_in_mem, num_reference_sequences, reference,
        barcode_whitelist_lookup_table_, temp_mapping_file_handles);
  } else {
  } 
  else {
    if (mapping_parameters_.Tn5_shift) {
      mapping_processor.ApplyTn5ShiftOnMappings(num_reference_sequences,
                                                mappings_on_diff_ref_seqs);
+29 −1
Original line number Diff line number Diff line
@@ -76,7 +76,11 @@ void AddMappingOptions(cxxopts::Options &options) {
                 "Min probability to correct a barcode [0.9]",
                 cxxopts::value<double>(),
                 "FLT")("t,num-threads", "# threads for mapping [1]",
                        cxxopts::value<int>(), "INT");
                        cxxopts::value<int>(), "INT")
      ("cache-size", "number of cache entries [4000003]", cxxopts::value<int>(), "INT")
      ("cache-update-param", "value used to control number of reads sampled [0.01]", cxxopts::value<double>(), "FLT")
      ("use-all-reads", "use all reads for cache")
      ("debug-cache", "verbose output for debugging cache used in chromap");
}

void AddInputOptions(cxxopts::Options &options) {
@@ -324,6 +328,30 @@ void ChromapDriver::ParseArgsAndRun(int argc, char *argv[]) {
  if (result.count("t")) {
    mapping_parameters.num_threads = result["num-threads"].as<int>();
  }


  // check cache-related parameters
  if (result.count("cache-update-param")) {
    mapping_parameters.cache_update_param = result["cache-update-param"].as<double>();
    if (mapping_parameters.cache_update_param < 0.0 || mapping_parameters.cache_update_param > 1.0){
      chromap::ExitWithMessage("cache update param is not approriate, must be in this range (0, 1]");
    }
  } 
  if (result.count("cache-size")) {
    mapping_parameters.cache_size = result["cache-size"].as<int>();
    if (mapping_parameters.cache_size < 2000000 || mapping_parameters.cache_size > 15000000) {
        chromap::ExitWithMessage("cache size is not in appropriate range\n");
    }
  }
  if (result.count("use-all-reads")) {
    mapping_parameters.use_all_reads = true;
  }
  if (result.count("debug-cache")) {
    mapping_parameters.debug_cache = true;
  }



  if (result.count("min-read-length")) {
    mapping_parameters.min_read_length = result["min-read-length"].as<int>();
  }
+6 −0
Original line number Diff line number Diff line
@@ -23,6 +23,12 @@ struct MappingParameters {
  std::vector<int> gap_extension_penalties = {1, 1};
  int min_num_seeds_required_for_mapping = 2;
  std::vector<int> max_seed_frequencies = {500, 1000};

  double cache_update_param = 0.01;
  int cache_size = 4000003;
  bool use_all_reads = false;
  bool debug_cache = false;

  // Read with # best mappings greater than it will have this number of best
  // mappings reported.
  int max_num_best_mappings = 1;
+82 −30
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@

#include "index.h"
#include "minimizer.h"
#include <mutex>

#define FINGER_PRINT_SIZE 103

@@ -27,6 +28,9 @@ class mm_cache {
 private:
  int cache_size;
  struct _mm_cache_entry *cache;
  int num_locks_for_cache = 1000;
  omp_lock_t entry_locks_omp[1000];
  std::mutex print_lock;
  int kmer_length;
  int update_limit;
  int saturate_count;
@@ -92,11 +96,21 @@ class mm_cache {
    memset(head_mm, 0, sizeof(uint64_t) * HEAD_MM_ARRAY_SIZE);
    update_limit = 10;
    saturate_count = 100;

    // initialize the array of OpenMP locks
    for (int i = 0; i < num_locks_for_cache; ++i) {
        omp_init_lock(&entry_locks_omp[i]);
    }
  }

  ~mm_cache() {
    delete[] cache;
    delete[] head_mm;
  
    // destory OpenMP locks for parallelizing cache update
    for (int i = 0; i < num_locks_for_cache; ++i) {
      omp_destroy_lock(&entry_locks_omp[i]);
    }
  }

  void SetKmerLength(int kl) { kmer_length = kl; }
@@ -173,16 +187,14 @@ class mm_cache {
  void Update(const std::vector<Minimizer> &minimizers,
              std::vector<Candidate> &pos_candidates,
              std::vector<Candidate> &neg_candidates,
              uint32_t repetitive_seed_length) {
              uint32_t repetitive_seed_length,
              bool debug=false) {
    int i;
    int msize = minimizers.size();

    uint64_t h = 0;  // for hash
    uint64_t f = 0;  // for finger printing
    /*for (i = 0; i < msize; ++i) {
      h += (minimizers[i].first);
      f ^= (minimizers[i].first);
    }*/

    if (msize == 0)
      return;
    else if (msize == 1) {
@@ -194,61 +206,60 @@ class mm_cache {
    int hidx = h % cache_size;
    int finger_print = f % FINGER_PRINT_SIZE;

    // beginning of locking phase - make sure to release it wherever we exit
    int lock_index = hidx % num_locks_for_cache;
    omp_set_lock(&entry_locks_omp[lock_index]);  

    ++cache[hidx].finger_print_cnt[finger_print];
    ++cache[hidx].finger_print_cnt_sum;
    if (cache[hidx].finger_print_cnt_sum > saturate_count) return;

    /*int rate = 2;
if (cache[hidx].finger_print_cnt_sum <= 2)
            rate = 0;
    else if (cache[hidx].finger_print_cnt_sum < 5)
            rate = 2;
    else if (cache[hidx].finger_print_cnt_sum < 10)
            rate = 4;
    else
            rate = 5;*/

    /*int rate = 2;
if (cache[hidx].finger_print_cnt_sum <= 5)
            rate = 0;
    else if (cache[hidx].finger_print_cnt_sum < 10)
            rate = 3;
    else
            rate = 5;*/
    // case 1: already saturated
    if (cache[hidx].finger_print_cnt_sum > saturate_count){ 
      omp_unset_lock(&entry_locks_omp[lock_index]);
      return;
    }

    // case 2: no heavy hitter or not enough yet
    if (cache[hidx].finger_print_cnt_sum < 10 ||
        (int)cache[hidx].finger_print_cnt[finger_print] * 5 <
            cache[hidx].finger_print_cnt_sum) {
      omp_unset_lock(&entry_locks_omp[lock_index]);
      return;
    }
    /*if ((int)cache[hidx].finger_print_cnt[finger_print] * rate <
       cache[hidx].finger_print_cnt_sum) { return ;
                }*/

    int direction = IsMinimizersMatchCache(minimizers, cache[hidx]);
    if (direction != 0)
      ++cache[hidx].weight;
    else
      --cache[hidx].weight;
    cache[hidx].activated = 1;

    // Renew the cache
    if (cache[hidx].weight < 0) {
      cache[hidx].weight = 1;
      cache[hidx].minimizers.resize(msize);

      if (msize == 0) {
        cache[hidx].offsets.clear();
        cache[hidx].strands.clear();
        omp_unset_lock(&entry_locks_omp[lock_index]);
        return;
      }

      int size = pos_candidates.size();
      int shift = (int)minimizers[0].GetSequencePosition();

      // Do not cache if it is too near the start.
      for (i = 0; i < size; ++i)
        if ((int)pos_candidates[i].position < kmer_length + shift) {
          cache[hidx].offsets.clear();
          cache[hidx].strands.clear();
          cache[hidx].minimizers.clear();

          omp_unset_lock(&entry_locks_omp[lock_index]);
          return;
        }

      size = neg_candidates.size();
      for (i = 0; i < size; ++i)
        if ((int)neg_candidates[i].position -
@@ -257,6 +268,8 @@ if (cache[hidx].finger_print_cnt_sum <= 5)
          cache[hidx].offsets.clear();
          cache[hidx].strands.clear();
          cache[hidx].minimizers.clear();

          omp_unset_lock(&entry_locks_omp[lock_index]);
          return;
        }
      cache[hidx].offsets.resize(msize - 1);
@@ -284,12 +297,45 @@ if (cache[hidx].finger_print_cnt_sum <= 5)
      for (i = 0; i < size; ++i)
        cache[hidx].negative_candidates[i].position -= shift;

      // Debugging output (candidate stored in cache)
      if (debug) {
        print_lock.lock();
        std::cout << "[DEBUG][CACHE][1] hidx = " << hidx << std::endl;
        std::cout << "[DEBUG][CACHE][2]" << " pos.size() = " 
                                << cache[hidx].positive_candidates.size() 
                                << " , " << "neg.size() = " 
                                << cache[hidx].negative_candidates.size()  
                                << " , msize = " << msize << std::endl;
        std::cout << "[DEBUG][CACHE][3] ";
        for (const auto &minimizer : minimizers) {
          std::cout << minimizer.GetHash() << " ";
        } std::cout << std::endl;

        for (size_t j = 0; j < cache[hidx].positive_candidates.size(); ++j) {
          std::cout << "[DEBUG][CACHE][+] " 
                    << "hidx = " << hidx
                    << " , cand_ref_seq = " << cache[hidx].positive_candidates[j].GetReferenceSequenceIndex() 
                    << " , cand_ref_pos = " << cache[hidx].positive_candidates[j].GetReferenceSequencePosition()
                    << " , support = " << unsigned(cache[hidx].positive_candidates[j].GetCount()) << std::endl;
        }

        for (size_t j = 0; j < cache[hidx].negative_candidates.size(); ++j) {
          std::cout << "[DEBUG][CACHE][-] " 
                    << "hidx = " << hidx
                    << " , cand_ref_seq = " << cache[hidx].negative_candidates[j].GetReferenceSequenceIndex() 
                    << " , cand_ref_pos = " << cache[hidx].negative_candidates[j].GetReferenceSequencePosition() 
                    << " , support = " << unsigned(cache[hidx].negative_candidates[j].GetCount()) << std::endl;
        }
        print_lock.unlock();
      }

      // Update head mm array
      head_mm[(minimizers[0].GetHash() >> 6) & HEAD_MM_ARRAY_MASK] |=
          (1ull << (minimizers[0].GetHash() & 0x3f));
      head_mm[(minimizers[msize - 1].GetHash() >> 6) & HEAD_MM_ARRAY_MASK] |=
          (1ull << (minimizers[msize - 1].GetHash() & 0x3f));
    }
    omp_unset_lock(&entry_locks_omp[lock_index]);
  }

  void DirectUpdateWeight(int idx, int weight) { cache[idx].weight += weight; }
@@ -309,13 +355,19 @@ if (cache[hidx].finger_print_cnt_sum <= 5)

  // How many reads from a batch we want to use to update the cache.
  // paired end data has twice the amount reads, so the threshold is lower
  uint32_t GetUpdateThreshold(uint32_t num_loaded_reads, uint64_t num_reads,
                              bool paired) {
  uint32_t GetUpdateThreshold(uint32_t num_loaded_reads, 
                              uint64_t num_reads,
                              bool paired,
                              bool use_all_reads,
                              double cache_update_param
                              ) {
    const uint32_t block = paired ? 2500000 : 5000000;    
    if (use_all_reads) {return num_loaded_reads;}

    if (num_reads <= block)
      return num_loaded_reads;
    else
      return num_loaded_reads / (1 + (num_reads / block));
      return num_loaded_reads / (1 + (cache_update_param * (num_reads / block)));
  }

  void PrintStats() {
Loading