diff --git a/.gitignore b/.gitignore index 5083963e..bd4c1150 100644 --- a/.gitignore +++ b/.gitignore @@ -98,3 +98,6 @@ ENV/ # Test Script test.py + +# Read the docs +_readthedocs/ diff --git a/conftest.py b/conftest.py index 764e4605..68c6d37f 100644 --- a/conftest.py +++ b/conftest.py @@ -22,8 +22,8 @@ def iviaServer(): # Create an ISAM appliance with above credential isam_server = ISAMAppliance(hostname=_host, user=u, lmi_port=_port) yield isam_server - returnValue = ibmsecurity.isam.appliance.commit(isamAppliance=isam_server) - print('\nCommit result') + returnValue = ibmsecurity.isam.appliance.commit(isamAppliance=isam_server, publish=True) + print('\nCommit result and publish') print( returnValue ) print('\n') return returnValue diff --git a/docs/changelog.md b/docs/changelog.md index 9bcb3f98..14a94b3d 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,8 @@ ## Latest +## 2025.6.3.0 + - feature: base/management_authentication.py - type federation - build: test setup - feature: web/reverse_proxy/oauth_configuration.py - add new parameters in 10.0.8 @@ -10,6 +12,14 @@ - fix: base/admin_ssh_keys.py - Ignore error when same ssh key exists under different name - pylint: change format() to f-strings - feature: base/tracing.py - Get tracing configuration (new in 10.0.8) +- fix: add ignore_errors to cli.py - incoming change +- feature: add publish parameter (for use in containers) (new in 10.0.8) +- feature: new function in tools `json_equals` +- feature: tuning_parameters.py - set multiple runtime parameters in 1 call +- fix: new parameter includeIssInAuthResp for oidc definitions (new in 10.0.8) +- feature: export a kerberos keytab file (new in 10.0.8) +- feature: get all audit configurations (new in 10.0.8) +- feature: update network certificate database (WIP) ## 2025.3.28.0 diff --git a/ibmsecurity/isam/aac/api_protection/definitions.py b/ibmsecurity/isam/aac/api_protection/definitions.py index 05c46ad2..d150081e 100644 --- a/ibmsecurity/isam/aac/api_protection/definitions.py +++ b/ibmsecurity/isam/aac/api_protection/definitions.py @@ -138,7 +138,12 @@ def add(isamAppliance, name, description="", accessPolicyName=None, grantTypes=[ if tools.version_compare(isamAppliance.facts["version"], "9.0.5.0") < 0: warnings.append( f"Appliance at version: {isamAppliance.facts['version']}, issueSecret: {json_data['oidc']['issueSecret']} is not supported. Needs 9.0.5.0 or higher. Ignoring issueSecret for this call.") - del json_data['oidc']['issueSecret'] + json_data['oidc'].pop('issueSecret', None) + if 'includeIssInAuthResp' in json_data['oidc']: + if tools.version_compare(isamAppliance.facts["version"], "10.0.8.0") < 0: + warnings.append( + f"Appliance at version: {isamAppliance.facts['version']}, issueSecret: {json_data['oidc']['includeIssInAuthResp']} is not supported. Needs 10.0.8.0 or higher. Ignoring includeIssInAuthResp for this call.") + json_data['oidc'].pop('includeIssInAuthResp', None) return isamAppliance.invoke_post( "Create an API protection definition", uri, @@ -234,16 +239,12 @@ def update(isamAppliance, name, description="", accessPolicyName=None, grantType oidc['attributeSources'] = _map_oidc_attributeSources(isamAppliance, oidc['attributeSources'], check_mode, force) json_data["oidc"] = oidc - if force is not True: + if not force: - if 'datecreated' in ret_obj['data']: - del ret_obj['data']['datecreated'] - if 'id' in ret_obj['data']: - del ret_obj['data']['id'] - if 'lastmodified' in ret_obj['data']: - del ret_obj['data']['lastmodified'] - if 'mappingRules' in ret_obj['data']: - del ret_obj['data']['mappingRules'] + ret_obj['data'].pop('datecreated', None) + ret_obj['data'].pop('id', None) + ret_obj['data'].pop('lastmodified', None) + ret_obj['data'].pop('mappingRules', None) # Inspecting oidcConfig and remove missing or None attributes in returned object if oidc is not None and 'oidc' in ret_obj['data']: diff --git a/ibmsecurity/isam/aac/risk_profiles.py b/ibmsecurity/isam/aac/risk_profiles.py index 32e4f96e..2816c1bb 100644 --- a/ibmsecurity/isam/aac/risk_profiles.py +++ b/ibmsecurity/isam/aac/risk_profiles.py @@ -160,7 +160,7 @@ def _check(isamAppliance, name, active, description, attributes, predefined): logger.warning("Risk Profile not found, returning no update required.") return None, update_required, json_data else: - if ret_obj['data']['predefined'] is True: + if ret_obj['data']['predefined']: logger.warning("Predefined Risk Profiles can NOT be updated, returning no update required.") return ret_obj['data']['id'], update_required, {} else: @@ -181,11 +181,7 @@ def _check(isamAppliance, name, active, description, attributes, predefined): id = ret_obj['data']['id'] del ret_obj['data']['id'] - sorted_json_data = json.dumps(json_data, skipkeys=True, sort_keys=True) - logger.debug("Sorted input: {0}".format(sorted_json_data)) - sorted_ret_obj = json.dumps(ret_obj['data'], skipkeys=True, sort_keys=True) - logger.debug("Sorted existing data: {0}".format(sorted_ret_obj)) - if sorted_ret_obj != sorted_json_data: + if not tools.json_equals(ret_obj, json_data): logger.info("Changes detected, update needed.") update_required = True diff --git a/ibmsecurity/isam/aac/scim/scim.py b/ibmsecurity/isam/aac/scim/scim.py index 818c54ca..c15fc56c 100644 --- a/ibmsecurity/isam/aac/scim/scim.py +++ b/ibmsecurity/isam/aac/scim/scim.py @@ -282,15 +282,15 @@ def set_all(isamAppliance, settings, check_mode=False, force=False): """ if settings is None or settings == '': return isamAppliance.create_return_object( - warnings="Need to pass content for scim configuration") + warnings=["Need to pass content for scim configuration"]) else: # Feature: Converting python string to dict (if required) # Attention: JSON strings must use " quotes according to RFC 8259 # Example: '{"a":1, "b": 2, "c": 3}' if isinstance(settings, str): settings = json.loads(settings) - if force is True or _check(isamAppliance, settings) is False: - if check_mode is True: + if force or not _check(isamAppliance, settings): + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_put( diff --git a/ibmsecurity/isam/aac/server_connections/smtp.py b/ibmsecurity/isam/aac/server_connections/smtp.py index b2e379bd..d65ce5dd 100644 --- a/ibmsecurity/isam/aac/server_connections/smtp.py +++ b/ibmsecurity/isam/aac/server_connections/smtp.py @@ -113,15 +113,10 @@ def update(isamAppliance, name, connection, description='', locked=False, connec warnings.append("Request made to ignore password for idempotency check.") connection.pop('password', None) - sorted_ret_obj = tools.json_sort(ret_obj['data']) - sorted_json_data = tools.json_sort(json_data) - logger.debug(f"Sorted Existing Data:{sorted_ret_obj}") - logger.debug(f"Sorted Desired Data:{sorted_json_data}") - - if sorted_ret_obj != sorted_json_data: + if not tools.json_equals(ret_obj, json_data): needs_update = True - if force is True or needs_update is True: + if force or needs_update: if check_mode is True: return isamAppliance.create_return_object(changed=True, warnings=warnings) else: diff --git a/ibmsecurity/isam/appliance.py b/ibmsecurity/isam/appliance.py index a41cb6d4..b22d9fa6 100644 --- a/ibmsecurity/isam/appliance.py +++ b/ibmsecurity/isam/appliance.py @@ -11,7 +11,7 @@ def reboot(isamAppliance, check_mode=False, force=False): """ Restart the appliance """ - if check_mode is True: + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_post("Restart the appliance", @@ -23,7 +23,7 @@ def shutdown(isamAppliance, check_mode=False, force=False): """ Shutdown the appliance """ - if check_mode is True: + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_post("Shutting down appliance", @@ -56,15 +56,23 @@ def _changes_available(isamAppliance): return False -def commit(isamAppliance, check_mode=False, force=False): +def commit(isamAppliance, publish=False, check_mode=False, force=False): """ Commit the current pending changes. """ - if force is True or _changes_available(isamAppliance) is True: - if check_mode is True: + if force or _changes_available(isamAppliance): + if check_mode: return isamAppliance.create_return_object(changed=True) else: - return isamAppliance.invoke_put("Committing the changes", + iviaVersion = isamAppliance.facts['version'] + if publish and ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.8.0") >= 0: + logger.debug("Commit: commit and publish") + return isamAppliance.invoke_put("Committing the changes (containers)", + f"/isam/pending_changes?publish={publish}", + {}) + else: + logger.debug("Commit: simple commit") + return isamAppliance.invoke_put("Committing the changes", "/isam/pending_changes", {}) @@ -79,8 +87,8 @@ def commit_and_restart(isamAppliance, check_mode=False, force=False): :param force: :return: """ - if force is True or _changes_available(isamAppliance) is True: - if check_mode is True: + if force or _changes_available(isamAppliance): + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_post("Commit and Restart", @@ -127,11 +135,10 @@ def reboot_and_wait(isamAppliance, wait_time=300, check_freq=5, check_mode=False time.sleep(check_freq) sec += check_freq logger.debug( - "Server is not responding yet. Waited for {0} secs, next check in {1} secs.".format(sec, - check_freq)) + f"Server is not responding yet. Waited for {sec} secs, next check in {check_freq} secs.") if sec >= wait_time: - warnings.append("Server reboot not detected or completed, exiting... after {0} seconds".format(sec)) + warnings.append(f"Server reboot not detected or completed, exiting... after {sec} seconds") break return isamAppliance.create_return_object(warnings=warnings) @@ -148,7 +155,7 @@ def commit_and_restart_and_wait(isamAppliance, wait_time=300, check_freq=5, chec :return: """ warnings = [] - if check_mode is True: + if check_mode: return isamAppliance.create_return_object(changed=True) else: lmi = ibmsecurity.isam.base.lmi.get(isamAppliance, check_mode=check_mode, force=force) @@ -171,12 +178,11 @@ def commit_and_restart_and_wait(isamAppliance, wait_time=300, check_freq=5, chec else: time.sleep(check_freq) sec += check_freq - logger.debug("LMI is not responding yet. Waited for {0} secs, next check in {1} secs.".format(sec, - check_freq)) + logger.debug(f"LMI is not responding yet. Waited for {sec} secs, next check in {check_freq} secs.") if sec >= wait_time: warnings.append( - "The LMI restart not detected or completed, exiting... after {0} seconds".format(sec)) + f"The LMI restart not detected or completed, exiting... after {sec} seconds") break return isamAppliance.create_return_object(warnings=warnings) @@ -186,7 +192,7 @@ def rollback(isamAppliance, check_mode=False, force=False): """ Rollback the current pending changes. """ - if force is True or _changes_available(isamAppliance) is True: + if force or _changes_available(isamAppliance): if check_mode is True: return isamAppliance.create_return_object(changed=True) else: diff --git a/ibmsecurity/isam/base/admin.py b/ibmsecurity/isam/base/admin.py index c68de456..8c7ec9ad 100644 --- a/ibmsecurity/isam/base/admin.py +++ b/ibmsecurity/isam/base/admin.py @@ -1,5 +1,5 @@ import logging -import ibmsecurity.utilities.tools +import ibmsecurity.utilities.tools as _tools import json logger = logging.getLogger(__name__) @@ -29,13 +29,13 @@ def set_pw(isamAppliance, oldPassword, newPassword, sessionTimeout="30", httpsPo "sessionTimeout": sessionTimeout } if httpsPort is not None: - if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "9.0.1.0") < 0: + if _tools.version_compare(isamAppliance.facts['version'], "9.0.1.0") < 0: warnings.append( "Appliance at version: {0}, httpsPort not supported. Needs 9.0.1.0 or higher. Ignoring httpsPort for this call.") else: json_data['httpsPort'] = httpsPort else: - if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "9.0.1.0") < 0: + if _tools.version_compare(isamAppliance.facts['version'], "9.0.1.0") < 0: pass # Can safely ignore httpsPort else: warnings.append("Default httpsPort of 443 will be set on the appliance.") @@ -168,67 +168,56 @@ def _check(isamAppliance, json_data["confirmPassword"] = v if k in ["minHeapSize", "maxHeapSize", "httpPort", "httpsPort", "minThreads", "maxThreads", "maxPoolSize", "maxFiles", "maxFileSize", "sshdPort", "sessionCachePurge", "sessionInactivityTimeout", "sshdClientAliveInterval", "baSessionTimeout"]: # int values - if k == "sshdPort" and ibmsecurity.utilities.tools.version_compare(iviaVersion, "9.0.3.0") < 0: + if k == "sshdPort" and _tools.version_compare(iviaVersion, "9.0.3.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, sshdPort: {v} is not supported. Needs 9.0.3.0 or higher. Ignoring sshdPort for this call.") continue - if k in ["sessionCachePurge", "sessionInactivityTimeout", "sshdClientAliveInterval"] and ibmsecurity.utilities.tools.version_compare(iviaVersion, "9.0.5.0") < 0: + if k in ["sessionCachePurge", "sessionInactivityTimeout", "sshdClientAliveInterval"] and _tools.version_compare(iviaVersion, "9.0.5.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 9.0.5.0 or higher. Ignoring.") continue - if k in ["baSessionTimeout"] and ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.2.0") < 0: + if k in ["baSessionTimeout"] and _tools.version_compare(iviaVersion, "10.0.2.0") < 0: warnings.append( f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 10.0.2.0 or higher. Ignoring.") continue json_data[k] = int(v) continue if k == "enableSSLv3": - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.3.0") >= 0: + if _tools.version_compare(iviaVersion, "10.0.3.0") >= 0: warnings.append(f"Appliance at version: {iviaVersion}, enableSSLv3: {v} is not supported. Needs max. 10.0.2.0. Ignoring for this call.") continue if k == "consoleLogLevel": if 'consoleLogLevel' in ret_obj['data'] and ret_obj['data']['consoleLogLevel'] == 'OFF': ret_obj['data']['consoleLogLevel'] = 'OFF' if k == "enabledTLS": - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "9.0.4.0") < 0: + if _tools.version_compare(iviaVersion, "9.0.4.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, enabledTLS: {v} is not supported. Needs 9.0.4.0 or higher. Ignoring enabledTLS for this call.") continue if k in ["swapFileSize", "httpProxy"]: - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "9.0.5.0") < 0: + if _tools.version_compare(iviaVersion, "9.0.5.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 9.0.5.0 or higher. Ignoring.") continue if k in ["enabledServerProtocols", "loginHeader", "loginMessage", "pendingChangesLifetime", "httpsProxy"]: - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "9.0.7.0") < 0: + if _tools.version_compare(iviaVersion, "9.0.7.0") < 0: warnings.append( f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 9.0.7.0 or higher. Ignoring.") continue if k in ["accessLogFormat"]: - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.0.0") < 0: + if _tools.version_compare(iviaVersion, "10.0.0.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 10.0.0.0 or higher. Ignoring.") continue if k in ["lmiMessageTimeout", "validVerifyDomains"]: - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.2.0") < 0: + if _tools.version_compare(iviaVersion, "10.0.2.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 10.0.0.0 or higher. Ignoring.") continue if k == "jsVersion": - if ibmsecurity.utilities.tools.version_compare(iviaVersion, "10.0.9.0") < 0: + if _tools.version_compare(iviaVersion, "10.0.9.0") < 0: warnings.append(f"Appliance at version: {iviaVersion}, {k}: {v} is not supported. Needs 10.0.9.0 or higher. Ignoring.") continue # Add to the json_data dict json_data[k] = v - # Remove keys from ret_obj that are not in json_data - fCurrentEntries = {k: v for k, v in ret_obj["data"].items() if k in json_data.keys()} - # - sorted_ret_obj = json.dumps(fCurrentEntries, skipkeys=True, sort_keys=True) - sorted_json_data = json.dumps(json_data, skipkeys=True, sort_keys=True) - logger.debug(f"Sorted Existing Data:\n\n{sorted_ret_obj}") - logger.debug(f"Sorted Desired Data:\n\n{sorted_json_data}") - if sorted_ret_obj != sorted_json_data: + if not _tools.json_equals(ret_obj, json_data): logger.debug("Admin Settings are found to be different. See above JSON for difference.") - # Ensure users know how REST API handles httpsPort default value - I think everybody should know this by now - # if json_data.get("httpsPort", "") == "" and ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], - # "9.0.1.0") >= 0: - # warnings.append("Default httpsPort of 443 will be set on the appliance.") return True, warnings, json_data else: # No changes required return False, warnings, json_data @@ -241,4 +230,4 @@ def compare(isamAppliance1, isamAppliance2): ret_obj1 = get(isamAppliance1) ret_obj2 = get(isamAppliance2) - return ibmsecurity.utilities.tools.json_compare(ret_obj1, ret_obj2) + return _tools.json_compare(ret_obj1, ret_obj2) diff --git a/ibmsecurity/isam/base/audit/configuration.py b/ibmsecurity/isam/base/audit/configuration.py index 8282a476..95b73209 100644 --- a/ibmsecurity/isam/base/audit/configuration.py +++ b/ibmsecurity/isam/base/audit/configuration.py @@ -14,12 +14,18 @@ requires_version = None -def get(isamAppliance, check_mode=False, force=False): +def get(isamAppliance, id=None, check_mode=False, force=False): """ Retrieve audit configuration + id is optional """ - return isamAppliance.invoke_get("Retrieve audit configuration", uri, requires_modules=requires_modules, + if id is None: + return isamAppliance.invoke_get("Retrieve current audit configuration", uri, requires_modules=requires_modules, requires_version=requires_version) + else: + return isamAppliance.invoke_get("Retrieve {id} audit configuration", + f"{uri}/{id}", requires_modules=requires_modules, + requires_version=requires_version) def set(isamAppliance, id, config, enabled=True, type='Syslog', verbose=True, check_mode=False, force=False, use_json=False, components=None): diff --git a/ibmsecurity/isam/base/cli.py b/ibmsecurity/isam/base/cli.py index 53b5052c..994dec33 100644 --- a/ibmsecurity/isam/base/cli.py +++ b/ibmsecurity/isam/base/cli.py @@ -8,7 +8,7 @@ requires_version = "9.0.3.0" -def execute(isamAppliance, command, input=None, check_mode=False, force=False): +def execute(isamAppliance, command, input=None, check_mode=False, force=False, ignore_error=False): """ Run CLI Command """ @@ -23,5 +23,6 @@ def execute(isamAppliance, command, input=None, check_mode=False, force=False): if check_mode is True: return isamAppliance.create_return_object(changed=True, warnings=warnings) else: - return isamAppliance.invoke_post("Run CLI Command", uri, post_data, requires_modules=requires_modules, - requires_version=requires_version, warnings=warnings) + return isamAppliance.invoke_post("Run CLI Command", uri, post_data, ignore_error=ignore_error, + requires_modules=requires_modules, requires_version=requires_version, + warnings=warnings) diff --git a/ibmsecurity/isam/base/runtime/tuning_parameters.py b/ibmsecurity/isam/base/runtime/tuning_parameters.py index fc7c1f3f..3f216495 100644 --- a/ibmsecurity/isam/base/runtime/tuning_parameters.py +++ b/ibmsecurity/isam/base/runtime/tuning_parameters.py @@ -1,4 +1,5 @@ import logging +import ibmsecurity.utilities.tools as _tools logger = logging.getLogger(__name__) requires_modules = ["mga", "federation"] @@ -15,16 +16,22 @@ def get(isamAppliance, check_mode=False, force=False): requires_model=requires_model) -def set(isamAppliance, option, value, check_mode=False, force=False): +def set(isamAppliance, option=None, value=None, values=None, ignore_endpoints=True, check_mode=False, force=False): """ Set a runtime tuning parameter + option and value are mutually exclusive with values + Set multiple runtime tuning parameters at once (if values is not none) """ warnings = [] matches, exists = False, False - if force is False: + if values is not None: + # Use the multi option. Ignore option/value in this case + return _setMultipleValues(isamAppliance, values, ignore_endpoints, check_mode, force) + + if not force: matches, exists, warnings = _check(isamAppliance, option, value) - if exists is False: + if not exists: warnings.append(f"Tuning Parameter {option} was not found. set() request will attempt anyway.") if force is True or matches is False: @@ -41,6 +48,32 @@ def set(isamAppliance, option, value, check_mode=False, force=False): return isamAppliance.create_return_object(warnings=warnings) +def _setMultipleValues(isamAppliance, values=None, ignore_endpoints=True, check_mode=False, force=False): + """ + Ignore_endpoints does not take the endpoint configuration into account. + This is typically set using the specific endpoint configurations. + """ + currentRuntimeParameters = get(isamAppliance) + warnings = [] + logger.debug("Setting multiple values for runtime tuning parameters") + if ignore_endpoints: + logger.info("Ignoring endpoint configuration in comparison") + warnings.append("Ignoring endpoint configuration in comparison") + currentRuntimeParameters.pop("endpoints", None) + values.pop("endpoints", None) + if force or not _tools.json_equals(currentRuntimeParameters, values): + if check_mode: + return isamAppliance.create_return_object(changed=True, warnings=warnings) + else: + return isamAppliance.invoke_put( + "Setting multiple runtime tuning parameters", + "/mga/runtime_tuning/v1", + values, + requires_modules=requires_modules, requires_model=requires_model) + + return isamAppliance.create_return_object(warnings=warnings) + + def _check(isamAppliance, option, value): """ Check if tuning parameter option exists and matches value diff --git a/ibmsecurity/isam/base/ssl_certificates/certificate_databases.py b/ibmsecurity/isam/base/ssl_certificates/certificate_databases.py index 1b29870d..43b9afd3 100644 --- a/ibmsecurity/isam/base/ssl_certificates/certificate_databases.py +++ b/ibmsecurity/isam/base/ssl_certificates/certificate_databases.py @@ -1,8 +1,11 @@ import logging import zipfile import shutil +import json +# from codecs import ignore_errors from ibmsecurity.utilities.tools import get_random_temp_dir +from ibmsecurity.utilities.tools import json_equals logger = logging.getLogger(__name__) requires_model = "Appliance" @@ -49,13 +52,19 @@ def get(isamAppliance, cert_dbase_id, check_mode=False, force=False): """ Retrieving the SSL certificate database details """ - return isamAppliance.invoke_get("Retrieving the SSL certificate database details", + + retObj = isamAppliance.invoke_get("Retrieving the SSL certificate database details", f"/isam/ssl_certificates/{cert_dbase_id}/details", - requires_model=requires_model) + requires_model=requires_model, + ignore_error=True) + if retObj.get('rc', 0) == 404: + return isamAppliance.create_return_object(rc=404, warnings=[f"{cert_dbase_id} does not exist"]) + else: + return retObj def create(isamAppliance, kdb_name, type='kdb', - check_mode=False, force=False + check_mode=False, force=False, **kwargs, ): """ @@ -74,10 +83,9 @@ def create(isamAppliance, kdb_name, type='kdb', """ warnings = [] if force or not _check(isamAppliance, kdb_name): - if check_modee: + if check_mode: return isamAppliance.create_return_object(changed=True) else: - json_data = { "kdb_name": kdb_name, "type": type, @@ -87,19 +95,19 @@ def create(isamAppliance, kdb_name, type='kdb', json_data[k] = v # new in 10.0.5 if k in certificate_database_new_10_05: - if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "10.0.8.0") < 0: - warnings.append( f"Appliance at version: {isamAppliance.facts['version']}, load_certificate: {load_certificate} is not supported. Needs 10.0.0.0 or higher. Ignoring load_certificate for this call.") + if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "10.0.5.0") < 0: + warnings.append(f"Appliance at version: {isamAppliance.facts['version']}, {k}: {v} is not supported. Needs 10.0.5.0 or higher. Ignoring {k} for this call.") else: json_data[k] = v # new in 10.0.8 if k in certificate_database_new_10_08: if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "10.0.8.0") < 0: - warnings.append( f"Appliance at version: {isamAppliance.facts['version']}, load_certificate: {load_certificate} is not supported. Needs 10.0.0.0 or higher. Ignoring load_certificate for this call.") + warnings.append(f"Appliance at version: {isamAppliance.facts['version']}, {k}: {v} is not supported. Needs 10.0.8.0 or higher. Ignoring {k} for this call.") else: json_data[k] = v # Logic if type != 'p11' or json_data.get("hsm_type", None) not in ("safenet","safenet-ha"): - warnings.append("Serial number, safenet_* are only valid for safenet hsm, removed from input") + # warnings.append("Serial number, safenet_* are only valid for safenet hsm, removed from input") json_data.pop("serial_number", None) json_data.pop("safenet_user", None) json_data.pop("safenet_pw", None) @@ -118,12 +126,17 @@ def create(isamAppliance, kdb_name, type='kdb', json_data.pop("rfs_port", None) json_data.pop("rfs_auth", None) - return isamAppliance.invoke_post( - f"Creating certificate database '{kdb_name}'", - "/isam/ssl_certificates", - json_data, - warnings=warnings - ) + retObj = isamAppliance.invoke_post( f"Creating certificate database {kdb_name}", + "/isam/ssl_certificates", + json_data, + warnings=warnings, + ignore_error=True + ) + if retObj.get("rc", 0) == 400: + warnings.append(f"Invalid type (you need to install an extension to support network hsm {type})") + return isamAppliance.create_return_object(warnings=warnings) + else: + return retObj return isamAppliance.create_return_object() @@ -222,31 +235,66 @@ def rename(isamAppliance, cert_id, new_name, check_mode=False, force=False): return isamAppliance.create_return_object() -def set(isamAppliance, cert_id, description, check_mode=False, force=False): +def set(isamAppliance, cert_id, description=None, type="kdb", check_mode=False, force=False, **kwargs): """ - Set description for a certificate database + Set description for a certificate database (type="kdb)" + Update network certificate database (type="p11") """ + warnings = [] desc_match = True # This will remain True even when cert db is not found! - if force is False: - ret_obj = get_all(isamAppliance) - for certdb in ret_obj['data']: - if certdb['id'] == cert_id: - if certdb['description'] != description: - desc_match = False - break - - if force is True or desc_match is False: - if check_mode is True: - return isamAppliance.create_return_object(changed=True) - else: - return isamAppliance.invoke_put( - "Set description for a certificate database", - f"/isam/ssl_certificates/{cert_id}", - { - "description": description - }) - return isamAppliance.create_return_object() + if type == "kdb": + if not force: + if description is None: + desc_match = True + else: + ret_obj = get_all(isamAppliance) + for certdb in ret_obj['data']: + if certdb['id'] == cert_id: + if certdb['description'] != description: + desc_match = False + break + + if force or not desc_match: + if check_mode is True: + return isamAppliance.create_return_object(changed=True) + else: + return isamAppliance.invoke_put( + "Set description for a certificate database", + f"/isam/ssl_certificates/{cert_id}", + { + "description": description + }) + else: + # type = p11 + json_data = {} + for k, v in kwargs.items(): + if k in certificate_database_optional_args: + json_data[k] = v + # new in 10.0.5 + if k in certificate_database_new_10_05: + if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "10.0.5.0") < 0: + warnings.append(f"Appliance at version: {isamAppliance.facts['version']}, {k}: {v} is not supported. Needs 10.0.5.0 or higher. Ignoring {k} for this call.") + else: + json_data[k] = v + # new in 10.0.8 + if k in certificate_database_new_10_08: + if ibmsecurity.utilities.tools.version_compare(isamAppliance.facts['version'], "10.0.8.0") < 0: + warnings.append(f"Appliance at version: {isamAppliance.facts['version']}, {k}: {v} is not supported. Needs 10.0.8.0 or higher. Ignoring {k} for this call.") + else: + json_data[k] = v + ret_obj = get(isamAppliance, cert_id) + # Idempotency + if ret_obj.get("rc", 0) == 404: + return isamAppliance.create_return_object(warnings=warnings) + if not json_equals(ret_obj, json_data): + return isamAppliance.invoke_put( + "Updating network certificate database", + f"/isam/ssl_certificates/{cert_id}", + json_data, + warnings=warnings + ) + return isamAppliance.create_return_object(warnings=warnings) def _check(isamAppliance, id): diff --git a/ibmsecurity/isam/statistics.py b/ibmsecurity/isam/statistics.py index c343cbc4..18e9127a 100644 --- a/ibmsecurity/isam/statistics.py +++ b/ibmsecurity/isam/statistics.py @@ -9,9 +9,7 @@ def get_network(isamAppliance, application_interface, statistics_duration, check Retrieving the Application Interface Statistics """ return isamAppliance.invoke_get("Retrieving the Application Interface Statistics", - "/analysis/interface_statistics.json{0}".format( - tools.create_query_string(prefix=application_interface, - timespan=statistics_duration)),requires_model=requires_model) + f"/analysis/interface_statistics.json{tools.create_query_string(prefix=application_interface, timespan=statistics_duration)}",requires_model=requires_model) def get_rp_junction(isamAppliance, instance, date, duration, check_mode=False, force=False): @@ -19,10 +17,7 @@ def get_rp_junction(isamAppliance, instance, date, duration, check_mode=False, f Retrieving junction average response times for a Reverse Proxy instance """ return isamAppliance.invoke_get("Retrieving junction average response times for a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/reqtime{0}".format( - tools.create_query_string(date=date, - duration=duration, - instance=instance)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/reqtime{tools.create_query_string(date=date, duration=duration, instance=instance)}",requires_model=requires_model) def get_rp_health_summary(isamAppliance, check_mode=False, force=False): @@ -39,11 +34,7 @@ def get_rp_throughput_summary(isamAppliance, date, duration, aspect, summary=Non """ headers = {'Accept': 'application/json', 'range': 'items=0-24'} return isamAppliance.invoke_get_with_headers("Retrieving a summary of throughput for all Reverse Proxy instances", - "/analysis/reverse_proxy_traffic/throughput/{0}".format( - tools.create_query_string(summary=summary, - date=date, - duration=duration, - aspect=aspect)), + f"/analysis/reverse_proxy_traffic/throughput/{tools.create_query_string(summary=summary, date=date, duration=duration, aspect=aspect)}", requires_model=requires_model, headers=headers) @@ -53,10 +44,7 @@ def get_rp_throughput(isamAppliance, instance, date, duration, check_mode=False, Retrieving throughput records for a specific Reverse Proxy instance """ return isamAppliance.invoke_get("Retrieving throughput records for a specific Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/throughput/{0}{1}".format(instance, - tools.create_query_string( - date=date, - duration=duration)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/throughput/{instance}{tools.create_query_string(date=date, duration=duration)}",requires_model=requires_model) def get_rp_traffic_summary(isamAppliance, instance, date, duration, aspect, summary=True, check_mode=False, @@ -66,11 +54,7 @@ def get_rp_traffic_summary(isamAppliance, instance, date, duration, aspect, summ """ return isamAppliance.invoke_get( "Retrieving a summary of traffic by Junction or User-Agent on a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/traffic/instance/{0}/{1}".format(instance, - tools.create_query_string(summary=summary, - date=date, - duration=duration, - aspect=aspect)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/traffic/instance/{instance}/{tools.create_query_string(summary=summary, date=date, duration=duration, aspect=aspect)}",requires_model=requires_model) def get_rp_traffic(isamAppliance, instance, date, duration, aspect, aspect_identifier, check_mode=False, force=False): @@ -79,9 +63,7 @@ def get_rp_traffic(isamAppliance, instance, date, duration, aspect, aspect_ident """ return isamAppliance.invoke_get( "Retrieving a summary of traffic records for a specific Junction or User-Agent on a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/traffic/instance/{0}/{1}/{2}{3}".format(instance, aspect, aspect_identifier, - tools.create_query_string(date=date, - duration=duration)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/traffic/instance/{instance}/{aspect}/{aspect_identifier}{tools.create_query_string(date=date, duration=duration)}",requires_model=requires_model) def get_rp_traffic_detail(isamAppliance, instance, date, duration, aspect, aspect_identifier, check_mode=False, @@ -91,10 +73,7 @@ def get_rp_traffic_detail(isamAppliance, instance, date, duration, aspect, aspec """ return isamAppliance.invoke_get( "Retrieving detailed traffic records for a specific Junction or User-Agent on a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/traffic/instance/{0}/{1}/{2}/{3}".format(instance, aspect, aspect_identifier, - tools.create_query_string(date=date, - duration=duration, - aspect=aspect)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/traffic/instance/{instance}/{aspect}/{aspect_identifier}/{tools.create_query_string(date=date, duration=duration, aspect=aspect)}",requires_model=requires_model) def get_rp_traffic_detail_aspect(isamAppliance, instance, date, duration, aspect, aspect_identifier, check_mode=False, @@ -104,11 +83,7 @@ def get_rp_traffic_detail_aspect(isamAppliance, instance, date, duration, aspect """ return isamAppliance.invoke_get( "Retrieving detailed traffic records for a specific User-Agent on a specific junction in a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/traffic/instance/{0}/{1}/{2}/{1}/{2}{3}".format(instance, aspect, - aspect_identifier, - tools.create_query_string( - date=date, - duration=duration)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/traffic/instance/{instance}/{aspect}/{aspect_identifier}/{aspect}/{aspect_identifier}{tools.create_query_string(date=date, duration=duration)}",requires_model=requires_model) def get_rp_waf_events(isamAppliance, instance, date, duration, type, check_mode=False, @@ -118,12 +93,7 @@ def get_rp_waf_events(isamAppliance, instance, date, duration, type, check_mode= """ return isamAppliance.invoke_get( "Retrieving security action events for a Reverse Proxy instance", - "/analysis/reverse_proxy_traffic/pam_events{0}".format( - tools.create_query_string( - date=date, - duration=duration, - instance=instance, - type=type)),requires_model=requires_model) + f"/analysis/reverse_proxy_traffic/pam_events{tools.create_query_string(date=date, duration=duration, instance=instance, type=type)}",requires_model=requires_model) def get_cpu(isamAppliance, statistics_duration, check_mode=False, force=False): @@ -132,9 +102,7 @@ def get_cpu(isamAppliance, statistics_duration, check_mode=False, force=False): """ return isamAppliance.invoke_get( "Retrieving the CPU Usage Statistics", - "/statistics/systems/cpu.json{0}".format( - tools.create_query_string( - timespan=statistics_duration)),requires_model=requires_model) + f"/statistics/systems/cpu.json{tools.create_query_string(timespan=statistics_duration)}",requires_model=requires_model) def get_memory(isamAppliance, statistics_duration, check_mode=False, force=False): @@ -143,9 +111,7 @@ def get_memory(isamAppliance, statistics_duration, check_mode=False, force=False """ return isamAppliance.invoke_get( "Retrieving the Memory Usage Statistics", - "/statistics/systems/memory.json{0}".format( - tools.create_query_string( - timespan=statistics_duration)),requires_model=requires_model) + f"/statistics/systems/memory.json{tools.create_query_string(timespan=statistics_duration)}",requires_model=requires_model) def get_storage(isamAppliance, statistics_duration, check_mode=False, force=False): @@ -154,6 +120,4 @@ def get_storage(isamAppliance, statistics_duration, check_mode=False, force=Fals """ return isamAppliance.invoke_get( "Retrieving the Storage Usage Statistics", - "/statistics/systems/storage.json{0}".format( - tools.create_query_string( - timespan=statistics_duration)),requires_model=requires_model) + f"/statistics/systems/storage.json{tools.create_query_string(timespan=statistics_duration)}",requires_model=requires_model) diff --git a/ibmsecurity/isam/web/kerberos_configuration/keyfiles.py b/ibmsecurity/isam/web/kerberos_configuration/keyfiles.py index 1eec2eea..e4fbed9f 100644 --- a/ibmsecurity/isam/web/kerberos_configuration/keyfiles.py +++ b/ibmsecurity/isam/web/kerberos_configuration/keyfiles.py @@ -26,11 +26,11 @@ def _check(isamAppliance, id): """ ret_obj = get(isamAppliance) - logger.debug("Looking for {0} existing keytab files in: {1}".format(id, ret_obj['data'])) + logger.debug(f"Looking for {id} existing keytab files in: {ret_obj['data']}") if ret_obj['data']: for keytab in ret_obj['data']: if keytab['id'] == id: - logger.debug("Found keytab: {0}".format(id)) + logger.debug(f"Found keytab: {id}") return True return False @@ -40,12 +40,12 @@ def import_keytab(isamAppliance, id, file, check_mode=False, force=False): """ Import a keytab file, id is the name of the keytab file (they should be the same) """ - if force is True or _check(isamAppliance, id) is False: - if check_mode is True: + if force or not _check(isamAppliance, id): + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_post_files(description="Import a keytab file", - uri="{0}".format(uri), + uri=uri, fileinfo=[{ 'file_formfield': 'keytab_file', 'filename': file, @@ -58,15 +58,42 @@ def import_keytab(isamAppliance, id, file, check_mode=False, force=False): return isamAppliance.create_return_object() +def export_keytab(isamAppliance, id, file, check_mode=False, force=False): + """ + Export a keytab file, id is the name of the keytab file (they should be the same) + """ + warnings=[] + if force or _check(isamAppliance, id): + if check_mode: + return isamAppliance.create_return_object(changed=True) + else: + retObj = isamAppliance.invoke_get_file(description="Export keytab file", + uri=f"{uri}/{id}", + filename=file, + requires_modules=requires_modules, + requires_version=requires_version, + ignore_error=True + ) + if retObj.get("rc", 0) == 404: + logger.info(f"No keytab found matching your arguments {id}") + warnings.append(f"No keytab found matching your arguments {id}") + return isamAppliance.create_return_object(warnings=warnings) + else: + return retObj + warnings.append(f"No keytab found matching your arguments {id}") + return isamAppliance.create_return_object(warnings=warnings) + + def delete(isamAppliance, id, check_mode=False, force=False): """ Delete a keytab file, id is the name of the keytab file """ - if force is True or _check(isamAppliance, id) is True: - if check_mode is True: + if force or _check(isamAppliance, id): + if check_mode: return isamAppliance.create_return_object(changed=True) else: - return isamAppliance.invoke_delete(description="Delete a keytab file", uri="{0}/{1}".format(uri, id), + return isamAppliance.invoke_delete(description="Delete a keytab file", + uri=f"{uri}/{id}", requires_modules=requires_modules, requires_version=requires_version) return isamAppliance.create_return_object() @@ -81,11 +108,11 @@ def combine(isamAppliance, newname, keytab_files, check_mode=False, force=False) """ warnings = [] for keytab in keytab_files: - if _check(isamAppliance, keytab) is False: - warnings.append("keytab file to be combined: {0}, not found".format(keytab)) + if not _check(isamAppliance, keytab): + warnings.append(f"keytab file to be combined: {keytab}, not found") - if force is True or _check(isamAppliance, newname) is False: - if check_mode is True: + if force or not _check(isamAppliance, newname): + if check_mode: return isamAppliance.create_return_object(changed=True) else: return isamAppliance.invoke_put(description="Combine keytab files", uri=uri, diff --git a/ibmsecurity/utilities/tools.py b/ibmsecurity/utilities/tools.py index dc070809..e802d4f9 100644 --- a/ibmsecurity/utilities/tools.py +++ b/ibmsecurity/utilities/tools.py @@ -311,3 +311,35 @@ def normalize(v): return 1 elif normalize(version1) < normalize(version2): return -1 + + +def json_equals(curObj, newObj, ignore_keys_not_in_new=True, skipkeys=True, sort_keys=True): + """ + Function to compare input and output (for idempotency) + + + :param curObj: The current object as output from an ISAMAppliance function + :param newObj: The input json data + :param ignore_keys_not_in_new: Filter current values for keys that are in the new object. This is not fully idempotent (because it will keep values that are not defined in the input) + :return: boolean: True if the 2 objects are the same + + TODO: include the custom class + """ + # Verify format + if curObj.get("data", None) is None: + fCurrentEntries = curObj + else: + fCurrentEntries = curObj["data"] + # Remove keys from ret_obj that are not in json_data + if ignore_keys_not_in_new: + fCurrentEntries = {k: v for k, v in fCurrentEntries.items() if k in newObj.keys()} + # + sorted_ret_obj = json.dumps(fCurrentEntries, skipkeys=skipkeys, sort_keys=sort_keys) + sorted_json_data = json.dumps(newObj, skipkeys=skipkeys, sort_keys=sort_keys) + logger.debug(f"Sorted Existing Data:\n\n{sorted_ret_obj}") + logger.debug(f"Sorted Desired Data:\n\n{sorted_json_data}") + if sorted_ret_obj == sorted_json_data: + logger.debug("\n\njson_equals: No changes detected\n\n") + return True + + return False diff --git a/pyproject.toml b/pyproject.toml index 42ed65ba..6361a1d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "ibmsecurity" -version = "2025.4.4.0" +version = "2025.6.3.0" authors = [ { name="IBM", email="secorch@wwpdl.vnet.ibm.com" }, ] diff --git a/setup.py b/setup.py index e7d1f812..24049b9a 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ packages=find_packages(), # Date of release used for version - please be sure to use YYYY.MM.DD.seq#, MM and DD should be two digits e.g. 2017.02.05.0 # seq# will be zero unless there are multiple release on a given day - then increment by one for additional release for that date - version="2025.4.4.0", + version="2025.6.3.0", description="Idempotent functions for IBM Security Appliance REST APIs", author="IBM", author_email="secorch@wwpdl.vnet.ibm.com", diff --git a/test/test_aac_serverconnections_smtp.py b/test/test_aac_serverconnections_smtp.py new file mode 100644 index 00000000..b8374c8c --- /dev/null +++ b/test/test_aac_serverconnections_smtp.py @@ -0,0 +1,69 @@ +import logging + +import ibmsecurity.isam.aac.server_connections +import ibmsecurity.isam.aac.server_connections.smtp +import ibmsecurity.isam.appliance + +import pytest + +def getTestData(): + testdata = [ + { + "connection": { + "hostName": "smtp.isam-test.ibm.com", + "hostPort": 587, + "user": "isamUser", + "password": "password", + "ssl": False + }, + "connectionManager": { + "connectTimeout": 30 + }, + "name": "SMTPTestConnection", + "description": "A test connection to a SMTP server", + "locked": False + } + ] + return testdata + + +def test_get_serverconnection_smtp(iviaServer, caplog) -> None: + """Get sms protection""" + caplog.set_level(logging.DEBUG) + arg = {} + + returnValue = ibmsecurity.isam.aac.server_connections.smtp.get_all(iviaServer, + **arg + ) + logging.log(logging.INFO, returnValue) + + + assert not returnValue.failed() + + +@pytest.mark.parametrize("items", getTestData()) +def test_set_serverconnection_smtp(iviaServer, caplog, items) -> None: + """Set api protection""" + caplog.set_level(logging.DEBUG) + # items is a key-value pair + logging.log(logging.INFO, items) + arg = {} + connection = {} + for k, v in items.items(): + if k == 'name': + name = v + continue + if k == 'connection': + connection = v + continue + arg[k] = v + + returnValue = ibmsecurity.isam.aac.server_connections.smtp.set(iviaServer, + name, + connection, + **arg + ) + logging.log(logging.INFO, returnValue) + + if returnValue is not None: + assert not returnValue.failed() diff --git a/test/test_base_audit.py b/test/test_base_audit.py new file mode 100644 index 00000000..dafed65f --- /dev/null +++ b/test/test_base_audit.py @@ -0,0 +1,51 @@ +import logging + +import ibmsecurity.isam.base.audit.configuration +import ibmsecurity.isam.appliance + +import pytest + +def getTestData(): + testdata = [ + { + "id": "1" + } + ] + return testdata + + +def test_current_audit_configuration(iviaServer, caplog) -> None: + """Get sms protection""" + caplog.set_level(logging.DEBUG) + arg = {} + + returnValue = ibmsecurity.isam.base.audit.configuration.get(iviaServer, + **arg + ) + logging.log(logging.INFO, returnValue) + + assert not returnValue.failed() + + +@pytest.mark.parametrize("items", getTestData()) +def test_get_specific_audit_configuration(iviaServer, caplog, items) -> None: + """Set admin ssh keys""" + caplog.set_level(logging.DEBUG) + # items is a key-value pair + logging.log(logging.INFO, items) + arg = {} + for k, v in items.items(): + #if k == 'name': + # name = v + # continue + #if k == 'key': + # key = v + # continue + arg[k] = v + + returnValue = ibmsecurity.isam.base.audit.configuration.get(iviaServer, **arg) + + logging.log(logging.INFO, returnValue) + + if returnValue is not None: + assert not returnValue.failed() diff --git a/test/test_base_certificate_databases.py b/test/test_base_certificate_databases.py new file mode 100644 index 00000000..9bbef7df --- /dev/null +++ b/test/test_base_certificate_databases.py @@ -0,0 +1,91 @@ +import logging + +import ibmsecurity.isam.base.ssl_certificates.certificate_databases +import ibmsecurity.isam.appliance + +import pytest + +def getTestData(): + testdata = [ + { + "kdb_name": "junctionkdb", + "type": "kdb" + }, + { + "kdb_name": "ncipherdb", + "type": "p11", + "token_label": "label", + "passcode": "passcode", + "hsm_type": "ncipher", + "ip": "10.150.25.207", + "rfs": "10.150.25.208" + } + ] + return testdata + + +def test_get_certificate_databases(iviaServer, caplog) -> None: + """Get sms protection""" + caplog.set_level(logging.DEBUG) + arg = {} + + returnValue = ibmsecurity.isam.base.ssl_certificates.certificate_databases.get_all(iviaServer, + **arg + ) + logging.log(logging.INFO, returnValue) + + assert not returnValue.failed() + + +@pytest.mark.parametrize("items", getTestData()) +def test_create_certificate_database(iviaServer, caplog, items) -> None: + """Set admin ssh keys""" + caplog.set_level(logging.DEBUG) + # items is a key-value pair + logging.log(logging.INFO, items) + arg = {} + kdb_name = None + for k, v in items.items(): + if k == 'kdb_name': + kdb_name = v + continue + #if k == 'key': + # key = v + # continue + arg[k] = v + + returnValue = ibmsecurity.isam.base.ssl_certificates.certificate_databases.create(iviaServer, kdb_name, + **arg) + + logging.log(logging.INFO, returnValue) + + if returnValue is not None: + assert not returnValue.failed() + + +@pytest.mark.parametrize("items", getTestData()) +def test_update_certificate_database(iviaServer, caplog, items) -> None: + """Set admin ssh keys""" + caplog.set_level(logging.DEBUG) + # items is a key-value pair + logging.log(logging.INFO, items) + arg = {} + cert_id = None + for k, v in items.items(): + if k == 'cert_id': + cert_id = v + continue + if k == 'kdb_name': + cert_id = v + continue + #if k == 'key': + # key = v + # continue + arg[k] = v + + returnValue = ibmsecurity.isam.base.ssl_certificates.certificate_databases.set(iviaServer, cert_id, **arg) + + logging.log(logging.INFO, returnValue) + + if returnValue is not None: + assert not returnValue.failed() diff --git a/test/test_base_runtime_tuning.py b/test/test_base_runtime_tuning.py new file mode 100644 index 00000000..1ddbd161 --- /dev/null +++ b/test/test_base_runtime_tuning.py @@ -0,0 +1,97 @@ +import logging + +import ibmsecurity.isam.base.runtime.tuning_parameters +import ibmsecurity.isam.appliance + +import pytest + +def getTestData(): + testdata = [ + { + "values": { + "trace_specification": "*=info", + "accept_client_certs": False, + "require_mtls": False, + "enabled_server_protocols": "TLSv1.2", + "enable_sso": False, + "auto_restart": False, + "auto_reload": False, + "console_log_level": "OFF", + "suppress_sensitive_trace": False, + "session_max_count": 1000, + "session_invalidation_timeout": 1800, + "session_reaper_poll_interval": 30, + "max_heap_size": 1024, + "min_heap_size": 512, + "max_threads": 20, + "min_threads": 10, + "max_files": 2, + "max_file_size": 40, + "enable_crldp": False, + "dns_resolution_cache_lifetime": 604800, + "keystore": "rt_profile_keys", + "keystore_label": "server", + "truststore": "rt_profile_keys", + "inbound_keystore":"rt_profile_keys", + "inbound_keystore_label": "server", + "inbound_truststore": "rt_profile_keys" + } + }, + { + "option": "enable_crldp", + "value": True + }, + { + "option": "max_files", + "value": 2 + }, + { + "values": { + "trace_specification": "*=info", + "accept_client_certs": False, + "require_mtls": False, + "enabled_server_protocols": "TLSv1.2", + "enable_sso": False, + "auto_restart": False, + "auto_reload": False, + "console_log_level": "OFF", + "suppress_sensitive_trace": False, + "session_max_count": 1000, + "session_invalidation_timeout": 1800, + "session_reaper_poll_interval": 30, + "enable_crldp": True, + "dns_resolution_cache_lifetime": 604800, + "keystore": "rt_profile_keys", + } + }, + ] + return testdata + + +@pytest.mark.parametrize("items", getTestData()) +def test_set_multiple_tuning_parameeters(iviaServer, caplog, items) -> None: + """Set api protection""" + caplog.set_level(logging.DEBUG) + # items is a key-value pair + logging.log(logging.INFO, items) + arg = {} + option = None + value = None + + for k, v in items.items(): + if k == 'option': + option = v + continue + if k == 'value': + value = v + continue + arg[k] = v + + returnValue = ibmsecurity.isam.base.runtime.tuning_parameters.set(iviaServer, + option, + value, + **arg + ) + logging.log(logging.INFO, returnValue) + + assert not returnValue.failed() diff --git a/test/test_web_kerberos.py b/test/test_web_kerberos.py new file mode 100644 index 00000000..09880bec --- /dev/null +++ b/test/test_web_kerberos.py @@ -0,0 +1,16 @@ +import logging + +import ibmsecurity.isam.web.kerberos_configuration.keyfiles +import ibmsecurity.isam.appliance + + +def test_export_keyfile(iviaServer, caplog) -> None: + """Get all admincfg options.""" + caplog.set_level(logging.DEBUG) + + returnValue = ibmsecurity.isam.web.kerberos_configuration.keyfiles.export_keytab(isamAppliance=iviaServer, + id="env.keytab", + file="/tmp/env.keytab") + logging.log(logging.INFO, returnValue) + + assert not returnValue.failed()