diff --git a/Files/shared.py b/Files/shared.py index 5f3fde7..10e70cd 100644 --- a/Files/shared.py +++ b/Files/shared.py @@ -118,19 +118,19 @@ def __get_hp_cfg(self): def __get_ibm_cfg(self): # IBM --------------------------------------------- ibm_url = self.cc.get('ibm', 'url') - ibm_url2 = self.cc.get('ibm', 'url2') + client_id = self.cc.get('ibm', 'client_id') return { 'url': ibm_url, - 'url2': ibm_url2 + 'client_id': client_id } def __get_lenovo_cfg(self): # lenovo ------------------------------------------- lenovo_url = self.cc.get('lenovo', 'url') - lenovo_url2 = self.cc.get('lenovo', 'url2') + client_id = self.cc.get('lenovo', 'client_id') return { 'url': lenovo_url, - 'url2': lenovo_url2 + 'client_id': client_id } def __get_meraki_cfg(self): diff --git a/Files/warranty.cfg.example b/Files/warranty.cfg.example index d3266f8..cf9fa02 100644 --- a/Files/warranty.cfg.example +++ b/Files/warranty.cfg.example @@ -33,15 +33,15 @@ api_secret = url = https://css.api.hp.com [ibm] -url = http://support.lenovo.com/services/us/en/ContentService/GetProducts -url2 = http://support.lenovo.com/us/en/products +url = https://SupportAPI.lenovo.com/v2.5/Warranty +client_id = [lenovo] -url = http://support.lenovo.com/services/us/en/ContentService/GetProducts -url2 = http://support.lenovo.com/us/en/products +url = https://SupportAPI.lenovo.com/v2.5/Warranty +client_id = [meraki] -api_key = +api_key = url = https://api.meraki.com/api/v0 [other] diff --git a/Files/warranty_ibm_lenovo.py b/Files/warranty_ibm_lenovo.py index 31a58a7..1736bdd 100644 --- a/Files/warranty_ibm_lenovo.py +++ b/Files/warranty_ibm_lenovo.py @@ -6,7 +6,7 @@ import random import requests -from shared import DEBUG, RETRY, ORDER_NO_TYPE, left +from shared import DEBUG, RETRY, ORDER_NO_TYPE, left, Device42rest from warranty_abstract import WarrantyBase try: @@ -19,7 +19,7 @@ class IbmLenovo(WarrantyBase, object): def __init__(self, vendor, params): super(IbmLenovo, self).__init__() self.url = params['url'] - self.url2 = params['url2'] + self.client_id = params['client_id'] self.debug = DEBUG self.retry = RETRY self.order_no = ORDER_NO_TYPE @@ -31,12 +31,23 @@ def __init__(self, vendor, params): if self.order_no == 'common': self.common = self.generate_random_order_no() - def get_product_info(self, serial, retry=True): + def get_product_info(self, serials, retry=True): if self.debug: - print '\t[+] Checking possible product "%s"' % serial - timeout = 10 + print '\t[+] Checking possible product "%s"' % serials + + timeout = 30 + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'ClientID': self.client_id + } + + params = { + 'Serial': serials + } + try: - resp = self.requests.get(self.url + '?productId=' + serial, verify=True, timeout=timeout) + resp = self.requests.get(self.url, params=params, headers=headers, verify=True, timeout=timeout) msg = 'Status code: %s' % str(resp.status_code) if str(resp.status_code) == '401': print '\t[!] HTTP error. Message was: %s' % msg @@ -45,70 +56,57 @@ def get_product_info(self, serial, retry=True): time.sleep(30) if retry: print '\n[!] Retry' - self.get_product_info(serial, False) + self.get_product_info(serials, False) else: return None else: + if self.debug: + print + print resp.json() + print return resp.json() except requests.RequestException as e: self.error_msg(e) return None def run_warranty_check(self, inline_serials, retry=True): + global full_serials + full_serials = {} + if self.debug: print '\t[+] Checking warranty "%s"' % inline_serials - timeout = 10 - result = [] - - for serial in inline_serials.split(','): - product_info = self.get_product_info(serial) - current_product = None - - if len(product_info) > 1: - for product in product_info: - current_product = self.get_product_info(product['Name'])[0] - if current_product['Name'] is not None: - break - else: - try: - current_product = product_info[0] - except IndexError: - print '\t[+] Unable to find "%s" orders' % serial - continue - - if current_product is not None: - url = self.url2 + '/' + current_product['Id'] + '?tabName=Warranty&beta=false' - - resp = self.requests.post( - url, - data={'SERIALNUMBERKEY': current_product['Serial']}, - verify=True, - timeout=timeout - ) - - # possible redirects - if resp.url != url: - resp = self.requests.post( - resp.url, - data={'SERIALNUMBERKEY': current_product['Serial']}, - verify=True, - timeout=timeout - ) - - data_object = re.search(r"ds_warranties=(.*?});", resp.text) - json_object = json.loads(data_object.group(1)) - result.append(json_object) + incoming_serials = inline_serials.split(',') + inline_serials = [] + + for d42_serial in incoming_serials: + d42_serial = d42_serial.upper() + if '_' in d42_serial: + full_serials.update({d42_serial.split('_')[0]: d42_serial}) + d42_serial = d42_serial.split('_')[0] + elif '(' in d42_serial: + full_serials.update({d42_serial.split('(')[0]: d42_serial}) + d42_serial = d42_serial.split('(')[0] + else: + full_serials.update({d42_serial: d42_serial}) + inline_serials.append(d42_serial) + result = self.get_product_info(inline_serials, retry) return result def process_result(self, result, purchases): + global full_serials data = {} - for item in result: + # The API returns results for single devices in a different format than multiple devices, this keeps everything + # returned from the API in the same list format + if 'Warranty' in result: + result = [result] - if 'BaseWarranties' in item and len(item['BaseWarranties']) > 0: - warranties = item['BaseWarranties'] + for item in result: + # Warranties + if 'Warranty' in item and len(item['Warranty']) > 0: + warranties = item['Warranty'] else: continue @@ -135,24 +133,27 @@ def process_result(self, result, purchases): lines = [] + # process warranty line items for a device for warranty in warranties: - - start_date = warranty['Start']['UTC'].split('T')[0] - end_date = warranty['End']['UTC'].split('T')[0] + try: + start_date = warranty['Start'].split('T')[0] + end_date = warranty['End'].split('T')[0] + except (KeyError, AttributeError): + continue data.update({'line_start_date': start_date}) data.update({'line_end_date': end_date}) - data.update({'line_contract_type': warranty['Origin']}) + data.update({'line_contract_type': warranty['Type']}) try: # There's a max 64 character limit on the line service type field in Device42 (version 13.1.0) - service_level_description = left(sub_item['ServiceLevelDescription'], 64) + service_level_description = left(warranty['Name'], 64) data.update({'line_service_type': service_level_description}) except: pass # update or duplicate? Compare warranty dates by serial, contract_id and end date - hasher = serial.split('.')[0] + start_date + end_date + hasher = serial + start_date + end_date try: d_purchase_id, d_order_no, d_line_no, d_contractid, d_start, d_end, forcedupdate = purchases[hasher] diff --git a/starter.py b/starter.py index 4c3db19..2045312 100644 --- a/starter.py +++ b/starter.py @@ -61,7 +61,7 @@ def get_vendor_api(name): elif vendor == 'ibm' or vendor == 'lenovo': ibm_lenovo_params = { 'url': current_cfg['url'], - 'url2': current_cfg['url2'], + 'client_id': current_cfg['client_id'], 'd42_rest': d42_rest } api = IbmLenovo(vendor, ibm_lenovo_params)