From e65b44a6b6373fe77789ff13473a906aa9fac732 Mon Sep 17 00:00:00 2001 From: Omar Sanchez Date: Tue, 18 Feb 2020 15:13:12 -0500 Subject: [PATCH 1/3] updated lenovo, ibm warranty sync. needs testing --- Files/shared.py | 8 +- Files/warranty.cfg.example | 10 +-- Files/warranty_ibm_lenovo.py | 160 ++++++++++++++++++++++------------- 3 files changed, 112 insertions(+), 66 deletions(-) diff --git a/Files/shared.py b/Files/shared.py index 5f3fde7..10e70cd 100644 --- a/Files/shared.py +++ b/Files/shared.py @@ -118,19 +118,19 @@ def __get_hp_cfg(self): def __get_ibm_cfg(self): # IBM --------------------------------------------- ibm_url = self.cc.get('ibm', 'url') - ibm_url2 = self.cc.get('ibm', 'url2') + client_id = self.cc.get('ibm', 'client_id') return { 'url': ibm_url, - 'url2': ibm_url2 + 'client_id': client_id } def __get_lenovo_cfg(self): # lenovo ------------------------------------------- lenovo_url = self.cc.get('lenovo', 'url') - lenovo_url2 = self.cc.get('lenovo', 'url2') + client_id = self.cc.get('lenovo', 'client_id') return { 'url': lenovo_url, - 'url2': lenovo_url2 + 'client_id': client_id } def __get_meraki_cfg(self): diff --git a/Files/warranty.cfg.example b/Files/warranty.cfg.example index d3266f8..cf9fa02 100644 --- a/Files/warranty.cfg.example +++ b/Files/warranty.cfg.example @@ -33,15 +33,15 @@ api_secret = url = https://css.api.hp.com [ibm] -url = http://support.lenovo.com/services/us/en/ContentService/GetProducts -url2 = http://support.lenovo.com/us/en/products +url = https://SupportAPI.lenovo.com/v2.5/Warranty +client_id = [lenovo] -url = http://support.lenovo.com/services/us/en/ContentService/GetProducts -url2 = http://support.lenovo.com/us/en/products +url = https://SupportAPI.lenovo.com/v2.5/Warranty +client_id = [meraki] -api_key = +api_key = url = https://api.meraki.com/api/v0 [other] diff --git a/Files/warranty_ibm_lenovo.py b/Files/warranty_ibm_lenovo.py index 31a58a7..2eb8e39 100644 --- a/Files/warranty_ibm_lenovo.py +++ b/Files/warranty_ibm_lenovo.py @@ -6,7 +6,7 @@ import random import requests -from shared import DEBUG, RETRY, ORDER_NO_TYPE, left +from shared import DEBUG, RETRY, ORDER_NO_TYPE, left, Device42rest from warranty_abstract import WarrantyBase try: @@ -19,7 +19,7 @@ class IbmLenovo(WarrantyBase, object): def __init__(self, vendor, params): super(IbmLenovo, self).__init__() self.url = params['url'] - self.url2 = params['url2'] + self.client_id = params['client_id'] self.debug = DEBUG self.retry = RETRY self.order_no = ORDER_NO_TYPE @@ -31,12 +31,18 @@ def __init__(self, vendor, params): if self.order_no == 'common': self.common = self.generate_random_order_no() - def get_product_info(self, serial, retry=True): + def get_product_info(self, serials, retry=True): if self.debug: - print '\t[+] Checking possible product "%s"' % serial - timeout = 10 + print '\t[+] Checking possible product "%s"' % serials + + timeout = 30 + + headers = { + 'ClientID': self.client_id + } + try: - resp = self.requests.get(self.url + '?productId=' + serial, verify=True, timeout=timeout) + resp = self.requests.get(self.url + '?Serial=' + serials, headers=headers, verify=True, timeout=timeout) msg = 'Status code: %s' % str(resp.status_code) if str(resp.status_code) == '401': print '\t[!] HTTP error. Message was: %s' % msg @@ -45,75 +51,58 @@ def get_product_info(self, serial, retry=True): time.sleep(30) if retry: print '\n[!] Retry' - self.get_product_info(serial, False) + self.get_product_info(serials, False) else: return None else: + # todo: used to get an example response for development + if self.debug: + print resp.json() return resp.json() except requests.RequestException as e: self.error_msg(e) return None def run_warranty_check(self, inline_serials, retry=True): + global full_serials + full_serials = {} + if self.debug: print '\t[+] Checking warranty "%s"' % inline_serials - timeout = 10 - result = [] - - for serial in inline_serials.split(','): - product_info = self.get_product_info(serial) - current_product = None - - if len(product_info) > 1: - for product in product_info: - current_product = self.get_product_info(product['Name'])[0] - if current_product['Name'] is not None: - break - else: - try: - current_product = product_info[0] - except IndexError: - print '\t[+] Unable to find "%s" orders' % serial - continue - - if current_product is not None: - - url = self.url2 + '/' + current_product['Id'] + '?tabName=Warranty&beta=false' - resp = self.requests.post( - url, - data={'SERIALNUMBERKEY': current_product['Serial']}, - verify=True, - timeout=timeout - ) - - # possible redirects - if resp.url != url: - resp = self.requests.post( - resp.url, - data={'SERIALNUMBERKEY': current_product['Serial']}, - verify=True, - timeout=timeout - ) - - data_object = re.search(r"ds_warranties=(.*?});", resp.text) - json_object = json.loads(data_object.group(1)) - result.append(json_object) + incoming_serials = inline_serials.split(',') + inline_serials = [] + + for d42_serial in incoming_serials: + d42_serial = d42_serial.upper() + if '_' in d42_serial: + full_serials.update({d42_serial.split('_')[0]: d42_serial}) + d42_serial = d42_serial.split('_')[0] + elif '(' in d42_serial: + full_serials.update({d42_serial.split('(')[0]: d42_serial}) + d42_serial = d42_serial.split('(')[0] + else: + full_serials.update({d42_serial: d42_serial}) + inline_serials.append(d42_serial) + inline_serials = ','.join(inline_serials) + result = self.get_product_info(inline_serials, retry) return result def process_result(self, result, purchases): + global full_serials data = {} for item in result: - if 'BaseWarranties' in item and len(item['BaseWarranties']) > 0: - warranties = item['BaseWarranties'] + # Warranties + if 'Warranty' in item and len(item['Warranty']) > 0: + warranties = item['Warranty'] else: continue data.clear() - serial = item['Serial'] + serial = item['ID'] if self.order_no == 'common': order_no = self.common @@ -135,24 +124,27 @@ def process_result(self, result, purchases): lines = [] + # process warranty line items for a device for warranty in warranties: - - start_date = warranty['Start']['UTC'].split('T')[0] - end_date = warranty['End']['UTC'].split('T')[0] + try: + start_date = warranty['Start']['UTC'].split('T')[0] + end_date = warranty['End']['UTC'].split('T')[0] + except (KeyError, AttributeError): + continue data.update({'line_start_date': start_date}) data.update({'line_end_date': end_date}) - data.update({'line_contract_type': warranty['Origin']}) + data.update({'line_contract_type': warranty['Type']}) try: # There's a max 64 character limit on the line service type field in Device42 (version 13.1.0) - service_level_description = left(sub_item['ServiceLevelDescription'], 64) + service_level_description = left(warranty['Name'], 64) data.update({'line_service_type': service_level_description}) except: pass # update or duplicate? Compare warranty dates by serial, contract_id and end date - hasher = serial.split('.')[0] + start_date + end_date + hasher = serial + start_date + end_date try: d_purchase_id, d_order_no, d_line_no, d_contractid, d_start, d_end, forcedupdate = purchases[hasher] @@ -173,3 +165,57 @@ def process_result(self, result, purchases): for line in lines: self.d42_rest.upload_data(line) data.clear() + + +if __name__ == '__main__': + """ + + test_serials = '1234, 1234, 1234, 1234' + + d42_params = { + 'username': 'admin', + 'password': 'adm!nd42', + 'url': 'http://192.168.92.130:8000' + } + + d42_rest = Device42rest(d42_params) + + vendor = "lenovo" + ibm_lenovo_params = { + 'url': 'https://SupportAPI.lenovo.com/v2.5/Warranty', + 'd42_rest': d42_rest + } + + test = IbmLenovo(vendor, ibm_lenovo_params) + + result = test.run_warranty_check(test_serials, True) + + # get purchases data from Device42 + orders = d42_rest.get_purchases() + purchases = {} + + if orders and 'purchases' in orders: + for order in orders['purchases']: + if 'line_items' in order: + purchase_id = order.get('purchase_id') + order_no = order.get('order_no') + + for line_item in order['line_items']: + line_no = line_item.get('line_no') + devices = line_item.get('devices') + contractid = line_item.get('line_notes') + # POs with no start and end dates will now be included and given a hasher key with date min and max + start = line_item.get('line_start_date') + end = line_item.get('line_end_date') + + if start and end and devices: + for device in devices: + if 'serial_no' in device: + serial = device['serial_no'] + hasher = serial + contractid + start + end + if hasher not in purchases: + purchases[hasher] = [purchase_id, order_no, line_no, contractid, start, end, + 'False'] + + test.process_result(result, purchases) + """ \ No newline at end of file From 1ea1f079e97841d5243a9da7c54743d63fd27517 Mon Sep 17 00:00:00 2001 From: Omar Sanchez Date: Mon, 9 Mar 2020 11:01:06 -0400 Subject: [PATCH 2/3] parameter fixes --- Files/warranty_ibm_lenovo.py | 54 ------------------------------------ starter.py | 2 +- 2 files changed, 1 insertion(+), 55 deletions(-) diff --git a/Files/warranty_ibm_lenovo.py b/Files/warranty_ibm_lenovo.py index 2eb8e39..04c0dfb 100644 --- a/Files/warranty_ibm_lenovo.py +++ b/Files/warranty_ibm_lenovo.py @@ -165,57 +165,3 @@ def process_result(self, result, purchases): for line in lines: self.d42_rest.upload_data(line) data.clear() - - -if __name__ == '__main__': - """ - - test_serials = '1234, 1234, 1234, 1234' - - d42_params = { - 'username': 'admin', - 'password': 'adm!nd42', - 'url': 'http://192.168.92.130:8000' - } - - d42_rest = Device42rest(d42_params) - - vendor = "lenovo" - ibm_lenovo_params = { - 'url': 'https://SupportAPI.lenovo.com/v2.5/Warranty', - 'd42_rest': d42_rest - } - - test = IbmLenovo(vendor, ibm_lenovo_params) - - result = test.run_warranty_check(test_serials, True) - - # get purchases data from Device42 - orders = d42_rest.get_purchases() - purchases = {} - - if orders and 'purchases' in orders: - for order in orders['purchases']: - if 'line_items' in order: - purchase_id = order.get('purchase_id') - order_no = order.get('order_no') - - for line_item in order['line_items']: - line_no = line_item.get('line_no') - devices = line_item.get('devices') - contractid = line_item.get('line_notes') - # POs with no start and end dates will now be included and given a hasher key with date min and max - start = line_item.get('line_start_date') - end = line_item.get('line_end_date') - - if start and end and devices: - for device in devices: - if 'serial_no' in device: - serial = device['serial_no'] - hasher = serial + contractid + start + end - if hasher not in purchases: - purchases[hasher] = [purchase_id, order_no, line_no, contractid, start, end, - 'False'] - - test.process_result(result, purchases) - """ \ No newline at end of file diff --git a/starter.py b/starter.py index 4c3db19..2045312 100644 --- a/starter.py +++ b/starter.py @@ -61,7 +61,7 @@ def get_vendor_api(name): elif vendor == 'ibm' or vendor == 'lenovo': ibm_lenovo_params = { 'url': current_cfg['url'], - 'url2': current_cfg['url2'], + 'client_id': current_cfg['client_id'], 'd42_rest': d42_rest } api = IbmLenovo(vendor, ibm_lenovo_params) From 7b51471b8f1f8e0bd9a61f0627d15078fab2f25c Mon Sep 17 00:00:00 2001 From: Omar Sanchez Date: Tue, 19 May 2020 19:02:52 -0400 Subject: [PATCH 3/3] tested and did some minor code refactoring --- Files/warranty_ibm_lenovo.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/Files/warranty_ibm_lenovo.py b/Files/warranty_ibm_lenovo.py index 04c0dfb..1736bdd 100644 --- a/Files/warranty_ibm_lenovo.py +++ b/Files/warranty_ibm_lenovo.py @@ -38,11 +38,16 @@ def get_product_info(self, serials, retry=True): timeout = 30 headers = { + 'Content-Type': 'application/x-www-form-urlencoded', 'ClientID': self.client_id } + params = { + 'Serial': serials + } + try: - resp = self.requests.get(self.url + '?Serial=' + serials, headers=headers, verify=True, timeout=timeout) + resp = self.requests.get(self.url, params=params, headers=headers, verify=True, timeout=timeout) msg = 'Status code: %s' % str(resp.status_code) if str(resp.status_code) == '401': print '\t[!] HTTP error. Message was: %s' % msg @@ -55,9 +60,10 @@ def get_product_info(self, serials, retry=True): else: return None else: - # todo: used to get an example response for development if self.debug: + print print resp.json() + print return resp.json() except requests.RequestException as e: self.error_msg(e) @@ -84,7 +90,6 @@ def run_warranty_check(self, inline_serials, retry=True): else: full_serials.update({d42_serial: d42_serial}) inline_serials.append(d42_serial) - inline_serials = ','.join(inline_serials) result = self.get_product_info(inline_serials, retry) return result @@ -93,8 +98,12 @@ def process_result(self, result, purchases): global full_serials data = {} - for item in result: + # The API returns results for single devices in a different format than multiple devices, this keeps everything + # returned from the API in the same list format + if 'Warranty' in result: + result = [result] + for item in result: # Warranties if 'Warranty' in item and len(item['Warranty']) > 0: warranties = item['Warranty'] @@ -102,7 +111,7 @@ def process_result(self, result, purchases): continue data.clear() - serial = item['ID'] + serial = item['Serial'] if self.order_no == 'common': order_no = self.common @@ -127,8 +136,8 @@ def process_result(self, result, purchases): # process warranty line items for a device for warranty in warranties: try: - start_date = warranty['Start']['UTC'].split('T')[0] - end_date = warranty['End']['UTC'].split('T')[0] + start_date = warranty['Start'].split('T')[0] + end_date = warranty['End'].split('T')[0] except (KeyError, AttributeError): continue