Source code for irflow_client.irflow_client

"""Python SDK and Wrapper for the IR-Flow REST API

"""
from json import dumps
import logging
import os
import sys
import tempfile

import requests
import urllib3
from .__version__ import __version__

try:
    import configparser
except ImportError:
    # py2 support
    import ConfigParser as configparser

# The next to lines suppress the SSL Warning
from urllib3.exceptions import InsecureRequestWarning

urllib3.disable_warnings(InsecureRequestWarning)


[docs]class IRFlowClientConfigError(Exception): """Raised on Config Errors""" pass
[docs]class IRFlowMaintenanceError(Exception): """Raised on HTTP 503 from IR-Flow App, likely being upgraded.""" pass
[docs]class IRFlowClient(object): """Python SDK for the IR-Flow REST API. """ end_points = { 'assign_user_to_alert': '/api/v1/alerts/{0}/assign', 'create_alert': 'api/v1/alerts', 'get_alert': 'api/v1/alerts', 'put_alert_close': 'api/v1/alerts/close', 'put_incident_on_alert': 'api/v1/alerts/%s/incident/%s', 'get_attachment': 'api/v1/attachments/%s/download', 'put_attachment': 'api/v1/%s/%s/attachments', 'get_fact_group': 'api/v1/fact_groups', 'put_fact_group': 'api/v1/fact_groups', 'create_incident': 'api/v1/incidents', 'get_incident': 'api/v1/incidents/%s', 'put_incident': 'api/v1/incidents/%s', 'put_alert_on_incident': 'api/v1/incidents/%s/alerts/%s', 'get_picklist_list': 'api/v1/picklists', 'get_picklist': 'api/v1/picklists/%s', 'add_item_to_picklist': 'api/v1/picklists/%s/picklist_items', 'get_picklist_item_list': 'api/v1/picklist_items', 'create_picklist_item': 'api/v1/picklist_items', 'get_picklist_item': 'api/v1/picklist_items/%s', 'restore_picklist_item': 'api/v1/picklist_items/%s/restore', 'delete_picklist_item': 'api/v1/picklist_items/%s', 'object_type': 'api/v1/object_types', 'version': 'api/v1/version' }
[docs] def __init__(self, config_args=None, config_file=None): """Create an API Client instance Creates API Client to IR-Flow API. Default timeout is 5 seconds on connect and 30 seconds on response. Args: config_args (dict): Key, Value pairs of IR-Flow API configuration options config_file (str): Path to a valid Ir-Flow configuration file """ self.circle_ci = os.environ.get('CI', False) self.logger = logging.getLogger(__name__) self.logger.addHandler(logging.NullHandler()) # Make sure we have config info we need if not (config_args or config_file): print('Missing config input parameters. Need either api.conf, or to pass in ' 'config_args to initialize IRFlowClient Class \n') if config_args and config_file: print('!!! Warning !!! Since you provided both input args and an api.conf file, we are' 'defaulting to the input args.') # parse config_args dict if config_args: self._get_config_args_params(config_args) # Else parse api.conf elif config_file: self._get_config_file_params(config_file) # Get a reusable session object. self.session = requests.Session() # Set timeout on (connect, read) timeouts self.session.timeout = (5, 30) # Set the User-Agent self.session.headers.update({'User-Agent': IRFlowClient._build_user_agent()}) # Set the X-Authorization header for all calls through the API # The rest of the headers are specified by the individual calls. self.session.headers.update({'X-Authorization': "{} {}" .format(self.api_user, self.api_key)}) if not self.circle_ci: self.version = self.get_version()
[docs] def dump_settings(self): """Helper function to print configuration information """ self.logger.debug('Configuration Settings\n' '\tAddress: "{}"\n' '\tAPI_User: "{}"\n' '\tAPI_Key: "{}"\n' '\tProtocol: "{}"\n' '\tDebug: "{}"\n' '\tVerbose: "{}"'.format(self.address, self.api_user, self.api_key, self.protocol, self.debug, self.verbose) )
[docs] def dump_request_debug_info(self, heading, url, headers=None, data=None, params=None): """Helper function to dump request info to the debug stream on the logging bus Args: heading (str): A string heading for the debug message - typically the name of the endpoint being queried url (str): The full url of the API endpoint headers (dict): The headers of this request, if desired data (dict): Key, Value pairs of data in the body of a request, if desired params (dict): Key, Value pairs of parameters passed in a request, if desired """ debug_string = '========== {} ==========\n' \ 'URL: "{}"\n' \ 'Session Headers: "{}"'.format(heading, url, self.session.headers) if headers: debug_string += '\nHeaders: "{}"'.format(headers) if data: debug_string += '\nBody: "{}"'.format(data) if params: debug_string += '\nParams: "{}"'.format(params) self.logger.debug(debug_string)
[docs] def dump_response_debug_info(self, heading, status, json): """Helper function to dump response info from a request to the debug stream on the logging bus Args: heading (str): A string heading for the debug message, the word 'Response' will be appended status (int): The HTTP response code of the previously made request json (dict): The full json response body as returned by the IR-Flow API """ self.logger.debug('========== {} Response ==========\n' 'HTTP Status: "{}"\n' 'Response JSON:\n{}'.format(heading, status, dumps(json, indent=2)))
[docs] def get_version(self, ): """Function to get Current IR-Flow Version Returns: str: IR-Flow Version Number Example: 4.6.0""" url = "%s://%s/%s" % (self.protocol, self.address, self.end_points['version']) headers = {'Content-type': 'application/json'} response = self.session.get(url, verify=False, headers=headers) if response.status_code == 503: raise IRFlowMaintenanceError('IR-Flow Server is down for maintenance') return str(response.json()['data']['version'])
[docs] def close_alert(self, alert_num, close_reason): """Close the alert with the provided number, for the provided reason Args: alert_num (int): The IR-Flow assigned alert number of the alert to close close_reason (str): The reason for which to close the desired alert Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_alert_close']) data = {"alert_num": "%s" % alert_num, "close_reason_name": "%s" % close_reason} headers = {'Content-type': 'application/json'} if self.debug: self.dump_request_debug_info('Close Alert', url, headers, data=data) response = self.session.put(url, json=data, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Close Alert', response.status_code, response.json()) return response.json()
[docs] def assign_user_to_alert(self, alert_num, username): """ Assign a user to an Alert Args: alert_num (int): The IR-Flow Assigned Alert Number of the Alert to attach to the specified incident username(string): The IR-Flow User to assign to an alert Returns: dict: The full json response object returned by the IR-Flow API. """ url = '{0}://{1}/{2}'.format(self.protocol, self.address, self.end_points['assign_user_to_alert']) url = url.format(alert_num) headers = {'Content-type': 'application/json'} payload = {'username': username} if self.debug: self.dump_request_debug_info('Assign User to Alert', url, headers=headers) response = self.session.put(url, json=payload, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Assign User to Alert', response.status_code, response.json()) return response.json()
[docs] def attach_incident_to_alert(self, incident_num, alert_num): """Attach the specified alert to the specified incident .. note:: This API endpoint will be deprecated in a future release. You should use :func:`attach_alert_to_incident`, which accomplishes the same outcome, and is how this would be done naturally in the interface. No new code should use this function. Args: incident_num (int): The Incident Number of the Incident to which the specified alert should be attached alert_num (int): The IR-Flow Assigned Alert Number of the Alert to attach to the specified incident Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_incident_on_alert']) url = url % (alert_num, incident_num) headers = {'Content-type': 'application/json'} if self.debug: self.dump_request_debug_info('Attach Incident to Alert', url, headers=headers) response = self.session.put(url, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Attach Incident to Alert', response.status_code, response.json()) return response.json()
[docs] def upload_attachment_to_alert(self, alert_num, filename): """Upload an attachment to the specified alert Args: alert_num (int): The IR-Flow Assigned Alert number of the Alert to which the desired filed should be uploaded filename (str): The path to the file to be uploaded Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_attachment']) url = url % ('alerts', alert_num) data = {'file': open(filename, 'rb')} headers = {} if self.debug: self.dump_request_debug_info('Upload Attachment to Alert', url, headers=headers) response = self.session.post(url, data={}, files=data, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Upload Attachment to Alert', response.status_code, response.json()) return response.json()
[docs] def upload_attachment_to_incident(self, incident_id, filename): """Upload an attachment to the specified incident Args: incident_id (int): The ID of the Incident to which the desired file should be uploaded filename (str): The path to the file to be uploaded Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_attachment']) url = url % ('incidents', incident_id) data = {'file': open(filename, 'rb')} headers = {} if self.debug: self.dump_request_debug_info('Upload Attachment to Incident', url, headers=headers) response = self.session.post(url, data={}, files=data, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Upload Attachment to Incident', response.status_code, response.json()) return response.json()
[docs] def upload_attachment_to_task(self, task_id, filename): """Upload an attachment to the specified task Args: task_id (int): The ID of the task to which the desired file should be uploaded filename (str): The path to the file to be uploaded Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_attachment']) url = url % ('tasks', task_id) data = {'file': open(filename, 'rb')} headers = {} if self.debug: self.dump_request_debug_info('Upload Attachment to Alert', url, headers=headers) response = self.session.post(url, data={}, files=data, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Upload Attachment to Alert Response', response.status_code, response.json()) return response.json()
[docs] def download_attachment(self, attachment_id, attachment_output_file): """Download the attachment with the specified ID Args: attachment_id (int): The ID of the attachment to be downloaded attachment_output_file (str): The full path to the file on disk to which the desired attachment should be saved """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_attachment']) url = url % attachment_id if self.debug: self.dump_request_debug_info('Download Attachment', url) with open(attachment_output_file, 'wb') as handle: response = self.session.get(url, stream=True, verify=False) for block in response.iter_content(1024): handle.write(block) if self.debug: self.dump_response_debug_info('Download Attachment', response.status_code, {"response": response.status_code}) print('done')
[docs] def download_attachment_string(self, attachment_id): """Download an attachment and return it as text Args: attachment_id (int): The ID of the attachment to be downloaded Returns: str: The textual contents of the downloaded file """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_attachment']) url = url % attachment_id if self.debug: self.dump_request_debug_info('Download Attachment String', url) # Get a temporary file to download the results into temp = tempfile.TemporaryFile() response = self.session.get(url, stream=True, verify=False) # Iterate, downloading data 1,024 bytes at a time for block in response.iter_content(1024): temp.write(block) # Rewind the file to the beginning so we can read it into a string temp.seek(0) if self.debug: self.dump_response_debug_info('Download Attachment String', response.status_code, response.json()) return temp.read()
[docs] def put_fact_group(self, fact_group_id, fact_data): """Put new or updated fact data in the specified fact group Args: fact_group_id (int): The IR-Flow assigned ID of the fact_group to be updated fact_data (dict): Key, Value pairs of fact fields as specified in IR-Flow and their values Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s/%s' % (self.protocol, self.address, self.end_points['get_fact_group'], fact_group_id) headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } fact_payload = {'fields': fact_data} if self.debug: self.dump_request_debug_info('Put Fact Group', url, headers=headers) response = self.session.put(url, json=fact_payload, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Put Fact Group', response.status_code, response.json()) return response.json()
[docs] def get_fact_group(self, fact_group_id): """Retrieve the current data in the specified fact group Args: fact_group_id (int): The IR-Flow assigned IF of the fact_group to retrieve Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s/%s' % (self.protocol, self.address, self.end_points['get_fact_group'], fact_group_id) headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get Fact Group', url, headers=headers) response = self.session.get(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get Fact Group', response.status_code, response.json()) return response.json()
[docs] def get_alert(self, alert_num): """Retrieve the alert with the specified alert number Args: alert_num (int): The IR-Flow assigned alert number of the alert to retrieve Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s/%s' % (self.protocol, self.address, self.end_points['get_alert'], alert_num) headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get Alert', url, headers=headers) response = self.session.get(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get Alert', response.status_code, response.json()) return response.json()
[docs] def create_alert(self, alert_fields, description=None, incoming_field_group_name=None, suppress_missing_field_warning=False): """Create an alert of the desired field group name with the specified fields and description Args: alert_fields (dict): Key, Value pairs of fields configured in IR-Flow and their values description (str): An optional string description for the alert incoming_field_group_name (str): The string name of the incoming field group name for this alert as specified in IR-Flow suppress_missing_field_warning (bool): Suppress the API warnings indicating missing fields if `True` - defaults to `False` Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['create_alert']) params = { 'fields': alert_fields, 'suppress_missing_field_warning': suppress_missing_field_warning } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if description is not None: params['description'] = description if incoming_field_group_name is not None: params['data_field_group_name'] = incoming_field_group_name if self.debug: self.dump_request_debug_info('Create Alert', url, headers=headers, params=params) response = self.session.post(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Create Alert', response.status_code, response.json()) return response.json()
[docs] def create_incident(self, incident_type_name, incident_fields=None, incident_subtype_name=None, description=None, priority_id=None, owner_id=None): """Create an incident of the desired type and subtype with the specified fields and description Args: incident_type_name (str): The string name of the incident type with which this incident should be created incident_subtype_name (str): The string name of the incident subtype with which this incident should be created (optional) incident_fields (dict): Key, Value pairs of fields configured in IR-Flow and their values (optional) description (str): An optional string description for the incident priority_id (str): ID of the priority to set owner_id (str): ID of the user to set incident owner to """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['create_incident']) params = { 'fields': incident_fields, 'incident_type_name': incident_type_name, } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if incident_subtype_name is not None: params['incident_subtype_name'] = incident_subtype_name if description is not None: params['description'] = description if priority_id is not None: params['priority_id'] = priority_id if owner_id is not None: params['owner_id'] = owner_id if self.debug: self.dump_request_debug_info('Create Incident', url, headers=headers, params=params) response = self.session.post(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Create Incident', response.status_code, response.json()) return response.json()
[docs] def get_incident(self, incident_num): """Retrieve the incident with the specified ID Args: incident_num (int): The IR-Flow assigned ID of the incident to be retrieved Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_incident']) url = url % incident_num headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get Incident', url, headers=headers) response = self.session.get(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get Incident', response.status_code, response.json()) return response.json()
[docs] def update_incident(self, incident_num, incident_fields, incident_type_name, owner_id, group_ids, incident_subtype_name=None, description=None, priority_id=None): """Update the incident of the provided number, type, and subtype with the provided fields and description Args: incident_num (int): The IR-Flow assigned ID of the incident to update incident_fields (dict): Key, Value pairs of fields configured in IR-Flow and their values incident_type_name (str): The string name of the incident type of the desired incident owner_id (int): The id of the user that will own this incident group_ids (list of int): The ids of the groups this incident will belong to. incident_subtype_name (str): The string name of the incident subtype of the desired incident (optional) description (str): An optional string description for the incident Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_incident']) url = url % incident_num params = { 'fields': incident_fields, 'incident_type_name': incident_type_name, 'owner_id': owner_id, 'group_ids': group_ids } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if incident_subtype_name is not None: params['incident_subtype_name'] = incident_subtype_name if description is not None: params['description'] = description if priority_id is not None: params['priority_id'] = priority_id if self.debug: self.dump_request_debug_info('Update Incident', url, headers=headers, params=params) response = self.session.put(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Update Incident', response.status_code, response.json()) return response.json()
[docs] def attach_alert_to_incident(self, alert_num, incident_num): """Attach the specified alert to the specified incident Args: incident_num (int): The Incident Number of the Incident to which the specified alert should be attached alert_num (int): The IR-Flow Assigned Alert Number of the Alert to attach to the specified incident Returns: dict: The full json response object returned by the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['put_alert_on_incident']) url = url % (incident_num, alert_num) headers = {'Content-type': 'application/json'} if self.debug: self.dump_request_debug_info('Attach Alert to Incident', url, headers=headers) response = self.session.put(url, headers=headers, verify=False) if self.debug: self.dump_response_debug_info('Attach Alert to Incident', response.status_code, response.json()) return response.json()
[docs] def list_picklists(self, with_trashed=False, only_trashed=False): """List all picklists Args: with_trashed (bool): Include deleted picklists - `False` by default only_trashed (bool): List only deleted picklists - `False` by default Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_picklist_list']) params = { 'with_trashed': with_trashed, 'only_trashed': only_trashed, } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get List of Picklists', url, headers=headers, params=params) response = self.session.get(url, params=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get List of Picklists', response.status_code, response.json()) return response.json()
[docs] def get_picklist(self, picklist_id): """Retrieve the picklist with the desired ID Args: picklist_id (int): The IR-Flow assigned id of the picklist to be retrieved Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_picklist']) url = url % picklist_id headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get Picklist', url, headers=headers) response = self.session.get(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get Picklist', response.status_code, response.json()) return response.json()
[docs] def add_item_to_picklist(self, picklist_id, value, label, description=None): """Add an item with the provided value, label, and description to the picklist matching the provided ID Args: picklist_id (int): The IR-Flow assigned ID of the picklist to which the new item should be added value (str): The string value submitted to actions and integrations for this picklist item label (str): The label to be displayed for this picklist item description (str): An optional description for this picklist item Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['add_item_to_picklist']) url = url % picklist_id params = { 'value': value, 'label': label, } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if description is not None: params['description'] = description if self.debug: self.dump_request_debug_info('Add Item to Picklist', url, headers=headers, params=params) response = self.session.post(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Add Item to Picklist', response.status_code, response.json()) return response.json()
[docs] def list_picklist_items(self, picklist_id, with_trashed=False, only_trashed=False): """Retrieve a list of all picklist items in a specified list Args: picklist_id (int): The IR-Flow Assigned ID of the picklist whose items to list with_trashed (bool): Include deleted items - `False` by default only_trashed (bool): Only list deleted items - `False` by default Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_picklist_item_list']) params = { 'picklist_id': picklist_id, 'with_trashed': with_trashed, 'only_trashed': only_trashed, } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get List of Picklist Items', url, headers=headers, params=params) response = self.session.get(url, params=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get List of Picklist Items', response.status_code, response.json()) return response.json()
[docs] def create_picklist_item(self, picklist_id, value, label, description=None): """Create a new item in a specified picklist Args: picklist_id (int): The IR-Flow assigned ID of the picklist to which the new item should be added value (str): The string value submitted to actions and integrations for this picklist item label (str): The label to be displayed for this picklist item description (str): An optional description for this picklist item Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['create_picklist_item']) params = { 'picklist_id': picklist_id, 'value': value, 'label': label, } headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if description is not None: params['description'] = description if self.debug: self.dump_request_debug_info('Add Picklist Item', url, headers=headers, params=params) response = self.session.post(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Add PIcklist Item', response.status_code, response.json()) return response.json()
[docs] def get_picklist_item(self, picklist_item_id): """Retrieve the picklist item corresponding to the specified ID Args: picklist_item_id (int): The IR-Flow assigned ID of the picklist item to be retrieved Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['get_picklist_item']) url = url % picklist_item_id headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Get Picklist Item', url, headers=headers) response = self.session.get(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Get Picklist Item', response.status_code, response.json()) return response.json()
[docs] def restore_picklist_item(self, picklist_item_id): """Restore a previously deleted picklist item Args: picklist_item_id (int): The IR-Flow assigned ID of the picklist item to be restored Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['restore_picklist_item']) url = url % picklist_item_id headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Restore Picklist Item', url, headers=headers) response = self.session.put(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Restore Picklist Item', response.status_code, response.json()) return response.json()
[docs] def delete_picklist_item(self, picklist_item_id): """Mark a picklist item as deleted Args: picklist_item_id (int): The IR-Flow assigned ID of the picklist item to be deleted Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['delete_picklist_item']) url = url % picklist_item_id headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } if self.debug: self.dump_request_debug_info('Delete Picklist Item', url, headers=headers) response = self.session.delete(url, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Delete Picklist Item', response.status_code, response.json()) return response.json()
[docs] def create_object_type(self, type_name, type_label, parent_type_name=None, parent_type_id=None): """Create an object type of the provided parent type or id with the provided name and label Args: type_name (str): The string name for this object type type_label (str): The label for this object type parent_type_name (str): The string name of the parent object type - required if no `parent_type_id` is specified parent_type_id (int): The id of the parent object type - required if no `parent_type_name` is specified Returns: dict: The full json response object from the IR-Flow API """ if type_name is None: raise TypeError("type_name is required") if type_label is None: raise TypeError("type_label is required") if parent_type_name is None and parent_type_id is None: raise TypeError("Either parent_type_name or parent_type_id is required") url = '%s://%s/%s' % (self.protocol, self.address, self.end_points['object_type']) headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } params = { 'type_name': type_name, 'type_label': type_label, 'parent_type_name': parent_type_name, 'parent_type_id': parent_type_id } if self.debug: self.dump_request_debug_info('Store Object Type', url, headers=headers, params=params) response = self.session.post(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Store Object Type', response.status_code, response.json()) return response.json()
[docs] def attach_field_to_object_type(self, object_type_name, field_name, object_type_id=None, field_id=None): """Attach an existing field to an object of the specified name or id Args: object_type_name (str): The string name of the object to which the specified field should be added - required only if no `object_type_id` is provided field_name (str): The string name of the field to be added to the specified object - required only if no `field_id` is provided object_type_id (int): The IR-Flow assigned ID of the object to which the specified field should be added - required only if no `object_type_name` is provided field_id (int): The IR-Flow assigned IF of the field to be added to the specified object - required only if no `field_name` is provided Returns: dict: The full json response object from the IR-Flow API """ url = '%s://%s/%s/%s' % (self.protocol, self.address, self.end_points['object_type'], 'attach_field') headers = { 'Content-type': 'application/json', 'Accept': 'application/json' } params = { 'object_type_name': object_type_name, 'field_name': field_name, 'object_type_id': object_type_id, 'field_id': field_id } if self.debug: self.dump_request_debug_info('Attach Field to Object Type', url, headers=headers, params=params) response = self.session.put(url, json=params, verify=False, headers=headers) if self.debug: self.dump_response_debug_info('Attach Field to Object Type', response.status_code, response.json()) return response.json()
# The following helper functions are also defined in the irflow_client
[docs] @staticmethod def get_field_by_name(field_name, field_list): """Helper function to return a field via a string name match given a field and field list Args: field_name (str): The string name of the desired field field_list (list): A list of field objects Returns: dict: The field object if found, `None` otherwise """ for field in field_list: if field['field']['field_name'] == field_name: return field return None
[docs] @staticmethod def _build_user_agent(): """Builds the current version User-Agent String Returns: str: user-agent """ return "IR-Flow-Client / {0} (Python {1}; en-us)".format(__version__, sys.version.split(' ')[0])
[docs] def _get_config_args_params(self, config_args): """Helper function to check/parse configuration arguments provided as a dict Args: config_args (dict): A dict of the following keys: Keys: address (str): IR-Flow Server FQDN or IP Address api_user (str): IR-Flow API User api_key (str): above user's api key protocol (str): https unless otherwise specified, default = HTTPS debug (bool): enable debug output, default = None verbose (int): turn up the verbosity default = 0 (optional) """ # Checking for missing config values if isinstance(config_args['address'], str): self.address = config_args['address'] elif not config_args['address']: raise KeyError('You have the wrong or missing key or value') else: raise KeyError('You have the wrong or missing key or value') if isinstance(config_args['api_user'], str): self.api_user = config_args['api_user'] elif not config_args['api_user']: raise KeyError('You have the wrong or missing key or value') else: raise KeyError('You have the wrong or missing key or value') if isinstance(config_args['api_key'], str): self.api_key = config_args['api_key'] elif not config_args['api_key']: raise KeyError('You have the wrong or missing key or value') else: raise KeyError('You have the wrong or missing key or value') if config_args['protocol']: self.protocol = config_args['protocol'] else: self.protocol = 'https' if config_args['debug']: self.debug = config_args['debug'] else: self.debug = False try: if config_args['verbose']: self.verbose = int(config_args['verbose']) except KeyError: self.verbose = 1 # Dump Configuration if --debug if self.debug: self.dump_settings()
[docs] def _get_config_file_params(self, config_file): """Helper function to parse configuration arguments from a valid IR-Flow configuration file Args: config_file (str): Path to a valid IR-Flow configuration file """ config = configparser.ConfigParser() config.read(config_file) # Make sure the Config File has the IRFlowAPI Section if not config.has_section('IRFlowAPI'): self.logger.error('Config file "{}" does not have the required section "[IRFlowAPI]"' .format(config_file)) raise IRFlowClientConfigError('Config file "{}" does not have the required section ' '"[IRFlowAPI]"'.format(config_file)) missing_options = [] # Check for missing required configuration keys if not config.has_option('IRFlowAPI', 'address'): self.logger.error( 'Configuration File "{}" does not contain the "address" option ' 'in the [IRFlowAPI] section'.format(config_file) ) missing_options.append('address') if not config.has_option('IRFlowAPI', 'api_user'): self.logger.error( 'Configuration File "{}" does not contain the "api_user" option ' 'in the [IRFlowAPI] section'.format(config_file) ) missing_options.append('api_user') if not config.has_option('IRFlowAPI', 'api_key'): self.logger.error( 'Configuration File "{}" does not contain the "api_key" option ' 'in the [IRFlowAPI] section'.format(config_file) ) missing_options.append('api_key') # Do not need to check for protocol, it is optional. Will assume https if missing. # Do not need to check for debug, it is optional. Will assume False if missing. # If the required keys do not exist, then simply exit if len(missing_options) > 0: self.logger.error('Missing configuration sections: {0}' .format(", ".join(missing_options))) raise IRFlowClientConfigError('Missing configuration sections: {0}' .format(", ".join(missing_options))) # Now set the configuration values on the self object. self.address = config.get('IRFlowAPI', 'address') self.api_user = config.get('IRFlowAPI', 'api_user') self.api_key = config.get('IRFlowAPI', 'api_key') if config.has_option('IRFlowAPI', 'protocol'): self.protocol = config.get('IRFlowAPI', 'protocol') else: self.protocol = 'https' if config.has_option('IRFlowAPI', 'debug'): self.debug = config.getboolean('IRFlowAPI', 'debug') else: self.debug = False if config.has_option('IRFlowAPI', 'verbose'): self.verbose = int(config.get('IRFlowAPI', 'verbose')) else: self.verbose = 1 # Dump Configuration if --debug if self.debug: self.dump_settings()