Skip to content

Commit

Permalink
Merge pull request #94 from Yelp/upgrade_vt_v3
Browse files Browse the repository at this point in the history
upgrades the virustotal version to version 3
  • Loading branch information
ytonui committed Apr 23, 2020
2 parents fedca5a + 40be8ee commit 8a5aba1
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 160 deletions.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -5,7 +5,7 @@

setup(
name="threat_intel",
version='0.1.30',
version='0.2.0',
provides=['threat_intel'],
author="Yelp Security",
url='https://github.com/Yelp/threat_intel',
Expand Down
144 changes: 98 additions & 46 deletions tests/virustotal_test.py
Expand Up @@ -2,6 +2,7 @@
#
import testify as T
from mock import patch
from mock import ANY

from threat_intel.virustotal import VirusTotalApi

Expand Down Expand Up @@ -29,80 +30,131 @@ def _test_api_call(self, call, endpoint, request, expected_query_params, api_res
with patch.object(self.vt, '_requests') as request_mock:
request_mock.multi_get.return_value = api_response
result = call(request)

request_mock.multi_get.assert_called_with(self.vt.BASE_DOMAIN + endpoint, query_params=expected_query_params)
param_list = [self.vt.BASE_DOMAIN + endpoint.format(param) for param in expected_query_params]
request_mock.multi_get.assert_called_with(param_list, file_download=ANY)
T.assert_equal(result, expected_result)

def test_get_file_reports(self):
self._test_api_call(call=self.vt.get_file_reports,
endpoint='file/report',
endpoint='files/{}',
request=['file1', 'file2'],
expected_query_params=[{'resource': 'file1,file2',
'apikey': 'test_key'}],
api_response=[{'resource': 'file1'}, {'resource': 'file2'}],
expected_result={'file1': {'resource': 'file1'},
'file2': {'resource': 'file2'}})
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_file_behaviour(self):
self._test_api_call(call=self.vt.get_file_behaviour,
endpoint='file/behaviour',
request=['file1', 'file2'],
expected_query_params=[{'resource': 'file1,file2',
'apikey': 'test_key'}],
api_response=[{'resource': 'file1'}, {'resource': 'file2'}],
expected_result={'file1': {'resource': 'file1'},
'file2': {'resource': 'file2'}})

def test_get_file_network_traffic(self):
self._test_api_call(call=self.vt.get_file_network_traffic,
endpoint='file/network-traffic',
endpoint='files/{}/behaviours',
request=['file1', 'file2'],
expected_query_params=[{'resource': 'file1,file2',
'apikey': 'test_key'}],
api_response=[{'resource': 'file1'}, {'resource': 'file2'}],
expected_result={'file1': {'resource': 'file1'},
'file2': {'resource': 'file2'}})
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_file_download(self):
self._test_api_call(call=self.vt.get_file_download,
endpoint='file/download',
endpoint='files/{}/download',
request=['file1', 'file2'],
expected_query_params=[{'resource': 'file1,file2',
'apikey': 'test_key'}],
api_response=[{'resource': 'file1'}, {'resource': 'file2'}],
expected_result={'file1': {'resource': 'file1'},
'file2': {'resource': 'file2'}})
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_domain_reports(self):
self._test_api_call(call=self.vt.get_domain_reports,
endpoint='domain/report',
endpoint='domains/{}',
request=['domain1', 'domain2'],
expected_query_params=[{'domain': 'domain1',
'apikey': 'test_key'},
{'domain': 'domain2',
'apikey': 'test_key'}],
expected_query_params=['domain1', 'domain2'],
api_response=[{}, {}],
expected_result={'domain1': {},
'domain2': {}})

def test_get_url_reports(self):
self._test_api_call(call=self.vt.get_url_reports,
endpoint='url/report',
endpoint='urls/{}',
request=['url1', 'url2'],
expected_query_params=[{'resource': 'url1\nurl2',
'apikey': 'test_key'}],
api_response=[{'resource': 'url1'}, {'resource': 'url2'}],
expected_result={'url1': {'resource': 'url1'},
'url2': {'resource': 'url2'}})
expected_query_params = ['url1', 'url2'],
api_response=[{'data':{'id': 'url1'}}, {'data':{'id': 'url2'}}],
expected_result={'url1': {'data': {'id': 'url1'}},
'url2': {'data': {'id': 'url2'}}})

def test_get_ip_reports(self):
self._test_api_call(call=self.vt.get_ip_reports,
endpoint='ip-address/report',
endpoint='ip_addresses/{}',
request=['ip1', 'ip2'],
expected_query_params=[{'ip': 'ip1',
'apikey': 'test_key'},
{'ip': 'ip2',
'apikey': 'test_key'}],
expected_query_params=['ip1', 'ip2'],
api_response=[{}, {}],
expected_result={'ip1': {},
'ip2': {}})

def test_get_file_contacted_domains(self):
self._test_api_call(call=self.vt.get_file_contacted_domains,
endpoint='files/{}/contacted_domains',
request=['domain1', 'domain2'],
expected_query_params=['domain1', 'domain2'],
api_response=[{'data':{'id': 'domain1'}}, {'data':{'id': 'domain2'}}],
expected_result={'domain1': {'data': {'id': 'domain1'}},
'domain2': {'data': {'id': 'domain2'}}})

def test_get_file_contacted_ips(self):
self._test_api_call(call=self.vt.get_file_contacted_ips,
endpoint='files/{}/contacted_ips',
request=['file1', 'file2'],
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_file_contacted_urls(self):
self._test_api_call(call=self.vt.get_file_contacted_urls,
endpoint='files/{}/contacted_urls',
request=['file1', 'file2'],
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_file_itw_urls(self):
self._test_api_call(call=self.vt.get_file_itw_urls,
endpoint='files/{}/itw_urls',
request=['file1', 'file2'],
expected_query_params=['file1', 'file2'],
api_response=[{'data':{'id': 'file1'}}, {'data':{'id': 'file2'}}],
expected_result={'file1': {'data': {'id': 'file1'}},
'file2': {'data': {'id': 'file2'}}})

def test_get_domain_communicating_files(self):
self._test_api_call(call=self.vt.get_domain_communicating_files,
endpoint='domains/{}/communicating_files',
request=['domain1', 'domain2'],
expected_query_params=['domain1', 'domain2'],
api_response=[{'data':{'id': 'domain1'}}, {'data':{'id': 'domain2'}}],
expected_result={'domain1': {'data': {'id': 'domain1'}},
'domain2': {'data': {'id': 'domain2'}}})

def test_get_domain_referrer_files(self):
self._test_api_call(call=self.vt.get_domain_referrer_files,
endpoint='domains/{}/referrer_files',
request=['domain1', 'domain2'],
expected_query_params=['domain1', 'domain2'],
api_response=[{'data':{'id': 'domain1'}}, {'data':{'id': 'domain2'}}],
expected_result={'domain1': {'data': {'id': 'domain1'}},
'domain2': {'data': {'id': 'domain2'}}})
def test_get_domain_reports(self):
self._test_api_call(call=self.vt.get_domain_reports,
endpoint='domains/{}',
request=['domain1', 'domain2'],
expected_query_params=['domain1', 'domain2'],
api_response=[{}, {}],
expected_result={'domain1': {},
'domain2': {}})

def test_get_file_clusters(self):
self._test_api_call(call=self.vt.get_file_clusters,
endpoint='feeds/file-behaviours/{}',
request=['time1', 'time2'],
expected_query_params=['time1', 'time2'],
api_response=[{'data':{'id': 'time1'}}, {'data':{'id': 'time2'}}],
expected_result={'time1': {'data': {'id': 'time1'}},
'time2': {'data': {'id': 'time2'}}})
24 changes: 20 additions & 4 deletions threat_intel/util/http.py
Expand Up @@ -6,8 +6,10 @@
# SSLAdapter helps force use of the highest possible version of TLS.
#
import logging
import re
import ssl
import time
from base64 import urlsafe_b64encode
from collections import namedtuple
from collections import OrderedDict
from functools import partial
Expand Down Expand Up @@ -200,21 +202,23 @@ def __init__(
),
)

def multi_get(self, urls, query_params=None, to_json=True):
def multi_get(self, urls, query_params=None, to_json=True, file_download=False):
"""Issue multiple GET requests.
Args:
urls - A string URL or list of string URLs
query_params - None, a dict, or a list of dicts representing the query params
to_json - A boolean, should the responses be returned as JSON blobs
file_download - A boolean, whether a file download is expected
Returns:
a list of dicts if to_json is set of requests.response otherwise.
Raises:
InvalidRequestError - Can not decide how many requests to issue.
"""
return self._multi_request(
MultiRequest._VERB_GET, urls, query_params,
data=None, to_json=to_json,
data=None, to_json=to_json, file_download=file_download,
)

def multi_post(self, urls, query_params=None, data=None, to_json=True, send_as_file=False):
Expand Down Expand Up @@ -379,6 +383,16 @@ def _wait_for_response(self, requests):

return list(responses_for_requests.values())

def _handle_file_download(self, response):
name = None
data = None
try:
name = re.findall('filename=(.+)', response.headers['content-disposition'])[0]
data = urlsafe_b64encode(response.text.encode('utf-8')).decode('utf-8')
except Exception:
logging.exception('Unable to extract download data for {} '.format(response.request.url))
return {'data': {'id': name, 'text': data}}

def _convert_to_json(self, response):
"""Converts response to JSON.
If the response cannot be converted to JSON then `None` is returned.
Expand All @@ -396,7 +410,7 @@ def _convert_to_json(self, response):
))
return None

def _multi_request(self, verb, urls, query_params, data, to_json=True, send_as_file=False):
def _multi_request(self, verb, urls, query_params, data, to_json=True, send_as_file=False, file_download=False):
"""Issues multiple batches of simultaneous HTTP requests and waits for responses.
Args:
Expand Down Expand Up @@ -435,8 +449,10 @@ def _multi_request(self, verb, urls, query_params, data, to_json=True, send_as_f

responses = self._wait_for_response(prepared_requests)
for response in responses:
if response:
if response and not file_download:
all_responses.append(self._convert_to_json(response) if to_json else response)
elif file_download:
all_responses.append(self._handle_file_download(response))
else:
all_responses.append(None)

Expand Down

0 comments on commit 8a5aba1

Please sign in to comment.