Commit 133e6207 authored by PidgeyL's avatar PidgeyL
Browse files

object use in api.py

parent a20b1448
Loading
Loading
Loading
Loading
+11 −4
Original line number Diff line number Diff line
@@ -199,6 +199,13 @@ class Database(metaclass=Singleton):
    if not cpe: return []
    return self.sanitize(self.colCVE.find({"vulnerable_configuration": {"$regex": cpe}}).sort("Modified", -1))

  def cve_query(self, limit=-1, skip=0, sort=None, query={}):
    if isinstance(query, list): query = {"$and": query}
    if isinstance(sort, (list, tuple)) and len(sort) == 2:
      if sort[1].lower() == "asc": sort = (sort[0], 1)
      else:                        sort = (sort[0], -1) # Default Descending
    cves = self.colCVE.find(query).sort(sort[0], sort[1]).limit(limit).skip(skip)
    return [CVE.fromDict(x) for x in self.sanitize(cves)]

  def getCVEs(limit=False, query=[], skip=0, cves=None, collection=None):
    col=colCVE if not collection else db[collection]
@@ -210,7 +217,7 @@ class Database(metaclass=Singleton):
      cve=col.find(query[0]).sort("Modified", -1).limit(limit).skip(skip)
    else:
      cve=col.find({"$and": query}).sort("Modified", -1).limit(limit).skip(skip)
    return sanitize(cve)
    return sanitize(cve) or []


  ########
@@ -232,7 +239,7 @@ class Database(metaclass=Singleton):
    return CPE.fromDict(cpe) if cpe else None

  def cpe_getAll(self):
    return [CPE.fromDict(x) for x in self.sanitize(self.colCPE.find())]
    return [CPE.fromDict(x) for x in self.sanitize(self.colCPE.find())] or []


  ########
@@ -248,7 +255,7 @@ class Database(metaclass=Singleton):


  def cwe_getAll(self):
    return [CWE.fromDict(x) for x in self.sanitize(self.colCWE.find())]
    return [CWE.fromDict(x) for x in self.sanitize(self.colCWE.find())] or []

  #########
  # CAPEC #
@@ -264,7 +271,7 @@ class Database(metaclass=Singleton):

  def capec_getAll(self):
    return [CAPEC.fromDict(x) for x in
            self.sanitize(self.colCAPEC.find())]
            self.sanitize(self.colCAPEC.find())] or []

  ########
  # VIA4 #
+0 −3
Original line number Diff line number Diff line
@@ -119,9 +119,6 @@ def getAlternativeCPE(id):
def getAlternativeCPEs():
  return sanitize(colCPEOTHER.find())

def getVIA4(id):
  return sanitize(colVIA4.find_one({'id': id}))

def getCPEMatching(regex, fullSearch=False):
  lst=list(colCPE.find({"id": {"$regex": regex}}))
  if fullSearch: lst.extend(colCPEOTHER.find({"id": {"$regex": regex}}))
+29 −3
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ from lib.Config import Configuration as conf
from lib.Database  import Database
from lib.Objects   import CVE, CPE, CWE, CAPEC, VIA4
from lib.Singleton import Singleton
from lib.Toolkit   import toStringFormattedCPE
from lib.Toolkit   import toStringFormattedCPE, exploitabilityScore, impactScore

# Code
class DatabaseLayer(metaclass=Singleton):
@@ -202,15 +202,40 @@ class CVEs:
    self.db.cve_upsert(cve)

  # Data retrieval
  def get(self, cveID):
  def get(self, cveID, **kwargs):
    cve = self.db.cve_get(cveID.upper())
    # Replace the dud at cve reconstruction time with the pointer
    if cve:
      if cve.cwe: cve.cwe = DatabaseLayer().CWE.get(cve.cwe.id)
      cve.vulnerable_configuration = [DatabaseLayer().CPE.get(x.id)
                                      for x in cve.vulnerable_configuration]
    self._enhance(cve, **kwargs)
    return cve

  def query(self, limit=-1, skip=0, sort=None,  query={}, **kwargs):
    if not sort: sort = ("Modified", "desc")
    cves = self.db.cve_query(limit=limit, skip=skip, sort=sort, query=query)
    self._enhance(cves, **kwargs)
    return cves

  def last(self, limit=-1, skip=0, query={}, **kwargs):
    cves = self.query(limit=limit, skip=skip, query=query)
    self._enhance(cves, **kwargs)
    return cves

  def _enhance(self, cve, via4=False, subscore=False, ranking=False, **kwargs):
    if isinstance(cve, CVE): cve = [cve]
    for c in cve:
      if via4:
        c.via4 = DatabaseLayer().VIA4.get(c.id)
      if ranking:
        pass
      if subscore:
        exploitCVSS=exploitabilityScore(cve)
        impactCVSS =impactScore(cve)
        cve.access.cvss =(math.ceil(exploitCVSS*10)/10) if type(exploitCVSS) is not str else exploitCVSS
        cve.impact.cvss =(math.ceil(impactCVSS *10)/10) if type(impactCVSS)  is not str else impactCVSS

  def forCPE(self, cpe):
    return self.db.cve_forCPE(cpe)

@@ -279,7 +304,7 @@ class CWEs:

  def getAll(self): #Safe way of accessing all CWE
    if not self.cwe: self._populate_memory_db()
    return self.cwe
    return list(self.cwe.values())

  def _populate_memory_db(self):
    self.cwe = {x.id: x for x in self.db.cwe_getAll()}
@@ -320,6 +345,7 @@ class CAPECs:
    return self.related.get(cweID)

  def _populate_memory_db(self):
    DatabaseLayer().CWE.get("0") # Force a db populate if not done yet
    self.capec   = {x.id: x for x in self.db.capec_getAll()}
    self.related = defaultdict(list)
    for c in self.capec.values():
+6 −6
Original line number Diff line number Diff line
@@ -67,9 +67,9 @@ def cpeTitle(cpe):
def impactScore(cve):
    score={'NONE':0,'PARTIAL':0.275,'COMPLETE':0.660}
    try:
      C=((cve['impact'])['confidentiality']).upper()
      I=((cve['impact'])['integrity']).upper()
      A=((cve['impact'])['availability']).upper()
      C = cve.impact.confidentiality.upper()
      I = cve.impact.integrity.upper()
      A = cve.impact.availability.upper()
      res = 10.41*(1-(1-score[C])*(1-score[I])*(1-score[A]))
      return 10.0 if res > 10.0 else res
    except:
@@ -80,9 +80,9 @@ def exploitabilityScore(cve):
    vScore={'NETWORK':1.0,'ADJACENT_NETWORK':0.646,'LOCAL':0.395}
    aScore={'NONE':0.704,'SINGLE_INSTANCE':0.56,'MULTIPLE_INSTANCES':0.45}
    try:
      C=((cve['access'])['complexity']).upper()
      V=((cve['access'])['vector']).upper()
      A=((cve['access'])['authentication']).upper()
      C = cve.access.complexity.upper()
      V = cve.access.vector.upper()
      A = cve.access.authentication.upper()
      return 20* cScore[C]*vScore[V]*aScore[A]
    except:
      return '-'
+28 −8
Original line number Diff line number Diff line
@@ -37,6 +37,11 @@ class TestAPI(unittest.TestCase):
        self.db.CPE.upsert([self.cpe1, self.cpe2])
        self.db.CVE.upsert(self.cve1)

        self.exp_capec1 = self.capec1.dict()
        self.exp_cwe1 = self.cwe1.dict()
        self.exp_cve1 = self.cve1.dict()
        self.exp_cve1['Published'] = self.exp_cve1['Published'].isoformat()

    def tearDown(self):
        del self.db
        del self.capec1
@@ -45,12 +50,13 @@ class TestAPI(unittest.TestCase):
        del self.cpe2
        del self.cve1

    # Testing Functions
    # Helper Functions
    def response_check(self, url, expected):
        url = 'http://localhost:5000/api/%s'%url
        data = json.loads((urlopen(url).read()).decode('utf8'))
        self.assertEqual(expected, data)

    # Testing Functions
    def test_cpe2_3(self):
        expected = "cpe:2.3:a:test:test"
        self.response_check("cpe2.3/cpe:/a:test:test", expected)
@@ -59,12 +65,26 @@ class TestAPI(unittest.TestCase):
        expected = "cpe:/a:test:test"
        self.response_check("cpe2.2/cpe:2.3:a:test:test", expected)

    def test_api_capec(self):
        expected = {"id": "10000",
                    "name": "test_capec",
                    "prerequisites": "No prerequisites",
                    "related_weakness": [ "10000" ],
                    "solutions": "There's no solution",
                    "summary": "no summary"}
    def test_cvefor(self):
        expected = [self.exp_cve1]
        self.response_check("cvefor/%s"%self.cpe1.id, expected)

    def test_cve(self):
        expected = self.exp_cve1
        self.response_check("cve/%s"%self.cve1.id, expected)

    def test_cwe(self):
        expected = [self.exp_cwe1]
        self.response_check("cwe", expected)

    def test_cwe_number(self):
        expected = [self.exp_capec1]
        self.response_check("cwe/%s"%self.cwe1.id, expected)

    def test_capec(self):
        expected = self.exp_capec1
        self.response_check("capec/10000", expected)

    def test_last(self):
        expected = [self.exp_cve1]
        self.response_check("last", expected)
Loading