diff --git a/tests/k8s_injector_dev/__init__.py b/tests/k8s_injector_dev/__init__.py new file mode 100644 index 00000000000..b1f69f946e4 --- /dev/null +++ b/tests/k8s_injector_dev/__init__.py @@ -0,0 +1 @@ +"""Package for K8s injector-dev tests.""" diff --git a/tests/k8s_injector_dev/test_k8s_injector_dev.py b/tests/k8s_injector_dev/test_k8s_injector_dev.py new file mode 100644 index 00000000000..053e90cc2ac --- /dev/null +++ b/tests/k8s_injector_dev/test_k8s_injector_dev.py @@ -0,0 +1,30 @@ +from utils import scenarios, features, logger, context +from utils.injector_dev.harness import Harness + + +@features.k8s_admission_controller +@scenarios.k8s_injector_dev_single_service +class TestK8sLibInjection: + """Test K8s injector dev tool""" + + def test_k8s_injector_dev(self): + logger.info("Testing K8s lib injection using the injector-dev tool") + + # Make sure current_scenario_provision is not None + if context.k8s_scenario_provision is None: + raise ValueError("k8s_scenario_provision is None, the scenario provision file was not created") + + # Convert Path to string for Harness.new + scenario_path = str(context.k8s_scenario_provision) + + # Initialize the harness with Must(New(t)) + h = Harness.must(Harness.new(scenario_path)) + + # Test the python-injection deployment + injected = h.deployment("app-injection", "application") + h.require(injected.has_injection()) + h.require(injected.has_env("DD_PROFILING_ENABLED", "true")) + + # Test the python-no-injection deployment + not_injected = h.deployment("app-no-injection", "application") + h.require(not_injected.has_no_injection()) diff --git a/tests/test_the_test/test_group_rules.py b/tests/test_the_test/test_group_rules.py index c07ce54e782..e641e848813 100644 --- a/tests/test_the_test/test_group_rules.py +++ b/tests/test_the_test/test_group_rules.py @@ -51,6 +51,7 @@ def test_tracer_release(): scenarios.k8s_lib_injection_spark_djm, scenarios.k8s_lib_injection_uds, scenarios.k8s_lib_injection, + scenarios.k8s_injector_dev_single_service, scenarios.lib_injection_validation_unsupported_lang, scenarios.lib_injection_validation, scenarios.local_auto_injection_install_script, diff --git a/utils/_context/_scenarios/__init__.py b/utils/_context/_scenarios/__init__.py index cb4d370683a..420f179cce1 100644 --- a/utils/_context/_scenarios/__init__.py +++ b/utils/_context/_scenarios/__init__.py @@ -16,6 +16,7 @@ from .test_the_test import TestTheTestScenario from .auto_injection import InstallerAutoInjectionScenario from .k8s_lib_injection import WeblogInjectionScenario, K8sScenario, K8sSparkScenario, K8sManualInstrumentationScenario +from .k8s_injector_dev import K8sInjectorDevScenario from .docker_ssi import DockerSSIScenario from .external_processing import ExternalProcessingScenario from .ipv6 import IPV6Scenario @@ -926,6 +927,13 @@ class _Scenarios: ) k8s_lib_injection_spark_djm = K8sSparkScenario("K8S_LIB_INJECTION_SPARK_DJM", doc="Kubernetes lib injection DJM") + # K8s Injector dev scenarios + k8s_injector_dev_single_service = K8sInjectorDevScenario( + "K8S_INJECTOR_DEV_SINGLE_SERVICE", + doc="Kubernetes Injector Dev Scenario", + scenario_provision="single-service.yaml", + ) + docker_ssi = DockerSSIScenario( "DOCKER_SSI", doc="Validates the installer and the ssi on a docker environment", diff --git a/utils/_context/_scenarios/k8s_injector_dev.py b/utils/_context/_scenarios/k8s_injector_dev.py new file mode 100644 index 00000000000..4db6d936478 --- /dev/null +++ b/utils/_context/_scenarios/k8s_injector_dev.py @@ -0,0 +1,150 @@ +from pathlib import Path + +import pytest + +from utils._context.component_version import ComponentVersion, Version + +from utils.k8s.k8s_component_image import ( + K8sComponentImage, + extract_library_version, + extract_injector_version, + extract_cluster_agent_version, +) +from utils._logger import logger +from utils.injector_dev.injector_client import InjectorDevClient +from utils.injector_dev.scenario_provision_updater import ScenarioProvisionUpdater +from .core import Scenario, scenario_groups, ScenarioGroup + +# Default scenario groups for K8sInjectorDevScenario +DEFAULT_SCENARIO_GROUPS: list[ScenarioGroup] = [scenario_groups.all, scenario_groups.lib_injection] + + +class K8sInjectorDevScenario(Scenario): + """Scenario that tests kubernetes lib injection using the injector-dev tool""" + + def __init__( + self, + name: str, + doc: str, + scenario_provision: str, + scenario_groups: list[ScenarioGroup] | None = None, + ) -> None: + if scenario_groups is None: + scenario_groups = DEFAULT_SCENARIO_GROUPS + super().__init__(name, doc=doc, github_workflow="libinjection", scenario_groups=scenario_groups) + # provision template + self.scenario_provision = scenario_provision + # Used to store the path to the actual scenario provision file (with injected component images/log folder) + self.current_scenario_provision: Path | None = None + + def configure(self, config: pytest.Config): + # These are the tested components: dd_cluser_agent_version, weblog image, library_init_version, injector version + self.k8s_weblog = config.option.k8s_weblog + + # Get component images: weblog, lib init, cluster agent, injector + self.k8s_weblog_img = K8sComponentImage(config.option.k8s_weblog_img, lambda _: "weblog-version-1.0") + + self.k8s_lib_init_img = K8sComponentImage(config.option.k8s_lib_init_img, extract_library_version) + + self.k8s_cluster_img = K8sComponentImage(config.option.k8s_cluster_img, extract_cluster_agent_version) + + self.k8s_injector_img = K8sComponentImage( + config.option.k8s_injector_img if config.option.k8s_injector_img else "gcr.io/datadoghq/apm-inject:latest", + extract_injector_version, + ) + + # Get component versions: lib init, cluster agent, injector + self._library = ComponentVersion(config.option.k8s_library, self.k8s_lib_init_img.version) + self.components["library"] = self._library.version + self.components["cluster_agent"] = self.k8s_cluster_img.version + self._datadog_apm_inject_version = f"v{self.k8s_injector_img.version}" + self.components["datadog-apm-inject"] = self._datadog_apm_inject_version + + # is it on sleep mode? + self._sleep_mode = config.option.sleep + + # Check if the scenario_provision file exists + scenario_provision_path = Path("utils") / "build" / "injector-dev" / self.scenario_provision + if not scenario_provision_path.exists(): + raise FileNotFoundError( + f"Scenario provision file not found at {scenario_provision_path}. Please build it first." + ) + + # Initialize the injector client + self.injector_client = InjectorDevClient() + + def print_context(self): + logger.stdout(".:: K8s Lib injection test components ::.") + logger.stdout(f"Weblog: {self.k8s_weblog}") + logger.stdout(f"Weblog image: {self.k8s_weblog_img.registry_url}") + logger.stdout(f"Library: {self._library}") + logger.stdout(f"Lib init image: {self.k8s_lib_init_img.registry_url}") + logger.stdout(f"Cluster agent version: {self.k8s_cluster_img.version}") + logger.stdout(f"Cluster agent image: {self.k8s_cluster_img.registry_url}") + logger.stdout(f"Injector version: {self._datadog_apm_inject_version}") + logger.stdout(f"Injector image: {self.k8s_injector_img.registry_url}") + + def get_warmups(self): + warmups = super().get_warmups() + warmups.append(self.print_context) + warmups.append(lambda: logger.terminal.write_sep("=", "Starting injector-dev", bold=True)) + warmups.append(self._start_injector_dev) + warmups.append(lambda: logger.terminal.write_sep("=", "Applying injector-dev scenario", bold=True)) + warmups.append(self._apply_scenario_injector_dev) + + return warmups + + def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int): # noqa: ARG002 + self.close_targets() + + def close_targets(self): + logger.info("Destroying cluster") + self._stop_injector_dev() + + def _start_injector_dev(self): + """Start the injector-dev tool""" + self.injector_client.start(debug=True) + + def _stop_injector_dev(self): + """Stop the injector-dev tool""" + self.injector_client.stop(clean_k8s=True) + + def _apply_scenario_injector_dev(self): + """Applies the scenario in yaml format to the injector-dev tool.""" + # Create a ScenarioProvisionUpdater instance pointing to the logs directory + logs_dir = Path(f"logs_{self.name}") + updater = ScenarioProvisionUpdater(logs_dir=logs_dir) + + # Update the scenario file with the component images + self.current_scenario_provision = updater.update_scenario( + self.scenario_provision, + self.k8s_cluster_img, + self.k8s_injector_img, + self.k8s_weblog_img, # Include the weblog image + self.k8s_lib_init_img, # Include the library init image + self._library.name, # Specify the language being tested + ) + + logger.info(f"Updated scenario file written to {self.current_scenario_provision}") + + # Apply the updated scenario + if self.current_scenario_provision: + self.injector_client.apply_scenario(self.current_scenario_provision, wait=True, debug=True) + else: + raise RuntimeError("No scenario provision file found. Please ensure the scenario is properly configured.") + + @property + def library(self): + return self._library + + @property + def weblog_variant(self): + return self.k8s_weblog + + @property + def k8s_cluster_agent_version(self): + return Version(self.k8s_cluster_img.version) + + @property + def dd_apm_inject_version(self): + return self._datadog_apm_inject_version diff --git a/utils/_context/core.py b/utils/_context/core.py index e43f5d2a297..473d704f905 100644 --- a/utils/_context/core.py +++ b/utils/_context/core.py @@ -119,6 +119,10 @@ def vm_os_cpu(self) -> str: def vm_name(self) -> str: return self.virtual_machine.name + @property + def k8s_scenario_provision(self) -> str: + return self._get_scenario_property("current_scenario_provision", {}) + def serialize(self): result = { "weblog_variant": self.weblog_variant, diff --git a/utils/build/injector-dev/single-service.yaml b/utils/build/injector-dev/single-service.yaml new file mode 100644 index 00000000000..276f9c37c52 --- /dev/null +++ b/utils/build/injector-dev/single-service.yaml @@ -0,0 +1,55 @@ +helm: + apps: + - name: app-injection + namespace: application + values: + service: + port: 18080 + image: + repository: registry.ddbuild.io/ci/injector-dev/python + tag: 2cd78ded + podLabels: + tags.datadoghq.com/env: local + app: "billing-service" + env: + - name: DD_TRACE_DEBUG + value: "true" + - name: DD_APM_INSTRUMENTATION_DEBUG + value: "true" + - name: app-no-injection + namespace: application + values: + service: + port: 18080 + image: + repository: registry.ddbuild.io/ci/injector-dev/python + tag: 2cd78ded + podLabels: + tags.datadoghq.com/env: local + env: + - name: DD_TRACE_DEBUG + value: "true" + - name: DD_APM_INSTRUMENTATION_DEBUG + value: "true" + versions: + agent: "7.64.0" + cluster_agent: "1.0.0" + injector: "0.35.0" + config: + datadog: + apm: + instrumentation: + enabled: true + targets: + - name: "billing-service" + podSelector: + matchLabels: + app: "billing-service" + namespaceSelector: + matchNames: + - "application" + ddTraceVersions: + python: "latest" + ddTraceConfigs: + - name: "DD_PROFILING_ENABLED" + value: "true" diff --git a/utils/injector_dev/__init__.py b/utils/injector_dev/__init__.py new file mode 100644 index 00000000000..c35140debc6 --- /dev/null +++ b/utils/injector_dev/__init__.py @@ -0,0 +1,6 @@ +"""Module providing utilities for the injector-dev tool.""" + +from .injector_client import InjectorDevClient +from .scenario_provision_updater import ScenarioProvisionUpdater + +__all__ = ["InjectorDevClient", "ScenarioProvisionUpdater"] diff --git a/utils/injector_dev/harness.py b/utils/injector_dev/harness.py new file mode 100755 index 00000000000..cf803b22a2b --- /dev/null +++ b/utils/injector_dev/harness.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +# filepath: /Users/roberto.montero/Documents/development/system-tests/utils/injector_dev/harness.py +# Package harness provides testing utilities for injector-dev test assertions. + +import logging +import yaml +import kubernetes as k8s +from kubernetes.stream import stream +from typing import Optional, Any +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class Result: + """Result represents the outcome of a test case.""" + + passed: bool + reason: str + + +class Deployment: + """Deployment represents a Kubernetes deployment in the test harness.""" + + def __init__(self, name: str, namespace: str, harness: "Harness"): + self.name = name + self.namespace = namespace + self.harness = harness + + def has_env(self, key: str, value: str) -> tuple[Result, Exception | None]: + """Check if the deployment has an environment variable with the specified value.""" + return self.harness.deployment_has_env(self.name, self.namespace, key, value) + + def does_not_have_env(self, key: str) -> tuple[Result, Exception | None]: + """Check if the deployment does not have the specified environment variable.""" + return self.harness.deployment_does_not_have_env(self.name, self.namespace, key) + + def has_injection(self) -> tuple[Result, Exception | None]: + """Check if the deployment has been injected.""" + return self.harness.deployment_has_injection(self.name, self.namespace) + + def has_no_injection(self) -> tuple[Result, Exception | None]: + """Check if the deployment has not been injected.""" + return self.harness.deployment_has_no_injection(self.name, self.namespace) + + def has_init_containers(self, containers: list[str]) -> tuple[Result, Exception | None]: + """Check if the deployment has the specified init containers.""" + return self.harness.deployment_has_init_containers(self.name, self.namespace, containers) + + def pod_has_env(self, env_name: str, env_value: str) -> tuple[Result, Exception | None]: + """Check if a pod in the deployment has the specified environment variable.""" + return self.harness.pod_has_env(self.name, self.namespace, env_name, env_value) + + +class Harness: + """Harness is a testing harness for injector-dev tests.""" + + def __init__( + self, scenario_provision: str | Path, k8s_client: dict[str, k8s.client.ApiClient], scenario: dict[str, Any] + ): + """Initialize a new Harness instance. + + Args: + scenario_provision: Path to the scenario provision file (can be string or Path object) + k8s_client: Kubernetes client APIs + scenario: The loaded scenario data + + """ + # Convert Path to string if needed + self.t = str(scenario_provision) + self.k8s = k8s_client + self.scenario = scenario + + @staticmethod + def new(scenario_provision: str | Path) -> tuple["Harness", Exception | None]: + """Create a new testing harness. + + Args: + scenario_provision: Path to the scenario provision file (can be string or Path object) + + Returns: + A tuple containing the harness and any error that occurred + + """ + k8s_client = {} # Initialize outside try block to avoid unbound local error + + try: + # Convert Path to string if needed + scenario_provision_str = str(scenario_provision) + + # Load Kubernetes config + k8s.config.load_kube_config() + k8s_client = { + "core": k8s.client.CoreV1Api(), + "apps": k8s.client.AppsV1Api(), + } + + # Load the scenario + with open(scenario_provision_str, "r") as f: + scenario = yaml.safe_load(f) + + # Validate scenario (simplified here) + if not scenario: + return Harness(scenario_provision_str, k8s_client, {}), ValueError("Empty scenario") + + logging.info(f"Loaded scenario at {scenario_provision_str}") + return Harness(scenario_provision_str, k8s_client, scenario), None + + except Exception as e: + # Make sure we convert Path to string here too + scenario_provision_str = str(scenario_provision) + return Harness(scenario_provision_str, k8s_client, {}), e + + @staticmethod + def must(harness_tuple: tuple[Optional["Harness"], Exception | None]) -> "Harness": + """Panic if there's an error, otherwise return the harness.""" + harness, err = harness_tuple + if err: + raise err + if not harness: + raise ValueError("Harness is None") + return harness + + def require(self, result_tuple: tuple[Result, Exception | None]) -> None: + """Require that a test result passes.""" + result, err = result_tuple + assert err is None, f"Expected no error, but got one. {err}" + assert result.passed, result.reason + + def deployment(self, name: str, namespace: str) -> Deployment: + """Get a deployment from the harness.""" + return Deployment(name, namespace, self) + + def apps(self) -> list[dict[str, Any]]: + """Get the apps from the scenario.""" + if not self.scenario: + return [] + + if self.scenario.get("helm"): + return self.scenario["helm"].get("apps", []) + + if self.scenario.get("operator"): + return self.scenario["operator"].get("apps", []) + + return [] + + def get_pod_for_scenario_app(self, deployment: str, namespace: str) -> tuple[Any, Exception | None]: + """Get a pod for a deployment that's defined in the scenario.""" + found = False + for app in self.apps(): + if app.get("namespace") == namespace and app.get("name") == deployment: + logging.info(f"Found app {namespace}/{deployment} in scenario") + found = True + break + + if not found: + return None, ValueError(f"Scenario missing app definition {namespace}/{deployment}") + + try: + # Get deployment + deployment_obj = self.k8s["apps"].read_namespaced_deployment(deployment, namespace) + + # Get pod for deployment + label_selector = ",".join([f"{k}={v}" for k, v in deployment_obj.spec.selector.match_labels.items()]) + pods = self.k8s["core"].list_namespaced_pod(namespace, label_selector=label_selector) + + if not pods.items: + return None, ValueError(f"No pods found for deployment {namespace}/{deployment}") + + return pods.items[0], None + + except Exception as e: + return None, e + + def deployment_has_init_containers( + self, deployment: str, namespace: str, containers: list[str] + ) -> tuple[Result, Exception | None]: + """Check if a deployment has the specified init containers.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + init_containers = {} + if pod.spec.init_containers: + for container in pod.spec.init_containers: + init_containers[container.name] = True + + for container in containers: + if container not in init_containers: + return Result( + passed=False, reason=f"Init container {container} not found in {list(init_containers.keys())}" + ), None + + return Result(passed=True, reason="Deployment has all init containers"), None + + def deployment_has_no_injection(self, deployment: str, namespace: str) -> tuple[Result, Exception | None]: + """Check if a deployment has not been injected.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + for container in pod.spec.containers: + result, err = self.container_has_no_injection(container) + if err: + return Result(passed=False, reason=f"Error checking container injection: {err}"), err + if not result.passed: + return result, None + + return Result(passed=True, reason="Deployment has no injection"), None + + def container_has_no_injection( + self, container: k8s.client.models.v1_container.V1Container + ) -> tuple[Result, Exception | None]: + """Check if a container has not been injected.""" + for env in container.env or []: + if env.name == "LD_PRELOAD": + return Result(passed=False, reason=f"{env.name} was set to {env.value}"), None + + return Result(passed=True, reason="Container has no injection"), None + + def deployment_has_injection(self, deployment: str, namespace: str) -> tuple[Result, Exception | None]: + """Check if a deployment has been injected.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + for container in pod.spec.containers: + result, err = self.container_has_injection(container) + if err: + return Result(passed=False, reason=f"Error checking container injection: {err}"), err + if not result.passed: + return result, None + + return Result(passed=True, reason="Deployment has injection"), None + + def container_has_injection( + self, container: k8s.client.models.v1_container.V1Container + ) -> tuple[Result, Exception | None]: + """Check if a container has been injected.""" + envs = self.env_map(container.env or []) + + if "LD_PRELOAD" not in envs: + return Result(passed=False, reason="LD_PRELOAD was not set"), None + + if "DD_TRACE_AGENT_URL" not in envs: + return Result(passed=False, reason="DD_TRACE_AGENT_URL was not set"), None + + if "DD_DOGSTATSD_URL" not in envs: + return Result(passed=False, reason="DD_DOGSTATSD_URL was not set"), None + + return Result(passed=True, reason=f"Container {container.name} has been injected"), None + + def deployment_has_env( + self, deployment: str, namespace: str, env_name: str, env_value: str + ) -> tuple[Result, Exception | None]: + """Check if a deployment has an environment variable with the specified value.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + for container in pod.spec.containers: + envs = self.env_map(container.env or []) + if env_name not in envs: + return Result(passed=False, reason=f"{env_name} was not set in {container.name}"), None + + if envs[env_name] != env_value: + return Result( + passed=False, + reason=f"{env_name} was set to {envs[env_name]} in {container.name}, expected {env_value}", + ), None + + return Result(passed=True, reason=f"{env_name} was set in all containers"), None + + def pod_has_env( + self, deployment: str, namespace: str, env_name: str, env_value: str + ) -> tuple[Result, Exception | None]: + """Check if a pod has an environment variable with the specified value.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + try: + exec_command = ["sh", "-c", f"echo ${env_name}"] + result = stream( + self.k8s["core"].connect_get_namespaced_pod_exec, + pod.metadata.name, + namespace, + command=exec_command, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ) + + stdout = "" + while result.is_open(): + if result.peek_stdout(): + stdout += result.read_stdout() + if result.peek_stderr(): + # Just consuming stderr + result.read_stderr() + if not result.peek_stdout() and not result.peek_stderr(): + break + + result.close() + value = stdout.strip() + + if value == env_value: + return Result(passed=True, reason=f"Found expected env var {env_value}"), None + + return Result( + passed=False, reason=f"Found unexpected env var {env_name}={value}, expected {env_value}" + ), None + + except Exception as e: + return Result(passed=False, reason=f"Error checking pod environment: {e}"), e + + def deployment_does_not_have_env( + self, deployment: str, namespace: str, env_name: str + ) -> tuple[Result, Exception | None]: + """Check if a deployment does not have the specified environment variable.""" + pod, err = self.get_pod_for_scenario_app(deployment, namespace) + if err: + return Result(passed=False, reason=f"Error getting pod: {err}"), err + + for container in pod.spec.containers: + for env in container.env or []: + if env.name == env_name: + return Result(passed=False, reason=f"{env_name} was set in {container.name}"), None + + return Result(passed=True, reason=f"{env_name} was not set in any containers"), None + + @staticmethod + def env_map(env_vars: list[Any]) -> dict[str, str]: + """Convert a list of environment variables to a map.""" + return {env.name: env.value for env in env_vars} diff --git a/utils/injector_dev/injector_client.py b/utils/injector_dev/injector_client.py new file mode 100755 index 00000000000..21bf0d1d22c --- /dev/null +++ b/utils/injector_dev/injector_client.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +"""Module providing client for interacting with the injector-dev tool.""" + +import subprocess +from pathlib import Path + +from utils._logger import logger + + +class InjectorDevClient: + """Client for interacting with the injector-dev command-line tool. + + This class provides a clean interface to run operations with the injector-dev tool, + handling command execution, logging, and error management. + + Attributes: + injector_path: Path to the injector-dev executable + + """ + + def __init__(self, injector_path: Path | None = None): + """Initialize the injector client. + + Args: + injector_path: Path to the injector-dev executable. + Defaults to 'binaries/injector-dev' if not provided. + + """ + self.injector_path = injector_path or Path("binaries") / "injector-dev" + + # Verify that the injector-dev tool exists + if not self.injector_path.exists(): + raise FileNotFoundError(f"Injector-dev tool not found at {self.injector_path}. Please build it first.") + + def _run_command_with_logging(self, command: list[str], success_message: str, error_prefix: str) -> None: + """Run a command with real-time logging of output. + + Args: + command: The command to execute as a list of strings + success_message: Message to log on successful execution + error_prefix: Prefix for error message if execution fails + + Raises: + RuntimeError: If the command execution fails + + """ + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + + # Output real-time and log simultaneously + if process.stdout is not None: + for line in process.stdout: + logger.stdout(line.strip()) + else: + logger.warning("Unable to capture output from command (stdout is None)") + + process.wait() + + if process.returncode == 0: + logger.info(success_message) + else: + error_message = f"{error_prefix} (exit code {process.returncode})" + logger.error(error_message) + raise RuntimeError(error_message) + + def start(self, *, debug: bool = False) -> None: + """Start the injector-dev tool. + + Args: + debug: Whether to start in debug mode + + Raises: + RuntimeError: If the injector-dev tool fails to start + + """ + logger.info("Starting injector-dev tool") + logger.info(f"Injector-dev tool found at {self.injector_path}. Starting it...") + + command = [str(self.injector_path), "start"] + if debug: + command.append("--debug") + + self._run_command_with_logging( + command, "Injector-dev tool started successfully", "Failed to start injector-dev tool" + ) + + def stop(self, *, clean_k8s: bool = True) -> None: + """Stop the injector-dev tool. + + Args: + clean_k8s: Whether to also clean up Kubernetes resources (minikube, colima) + + """ + logger.info("Stopping injector-dev tool") + logger.info(f"Injector-dev tool found at {self.injector_path}. Stopping it...") + + # Stop the injector-dev tool + subprocess.run([str(self.injector_path), "stop"], check=False) + + # Clean up Kubernetes resources if requested + if clean_k8s: + subprocess.run(["minikube", "delete"], check=False) + subprocess.run(["colima", "delete", "-f"], check=False) + + logger.info("Injector-dev tool stopped successfully") + + def apply_scenario(self, scenario_path: Path, *, wait: bool = True, debug: bool = False) -> None: + """Apply a scenario to the injector-dev tool. + + Args: + scenario_path: Path to the scenario YAML file + wait: Whether to wait for the scenario to be applied + debug: Whether to enable debug mode + + Raises: + FileNotFoundError: If the scenario file doesn't exist + RuntimeError: If the scenario application fails + + """ + if not scenario_path.exists(): + raise FileNotFoundError(f"Scenario file not found at {scenario_path}") + + logger.stdout(f"Applying scenario [{scenario_path.name}]...") + + command = [str(self.injector_path), "apply", "-f", str(scenario_path)] + if wait: + command.append("--wait") + if debug: + command.append("--debug") + + self._run_command_with_logging( + command, "Scenario applied successfully", "Failed to apply scenario with injector-dev" + ) diff --git a/utils/injector_dev/scenario_provision_updater.py b/utils/injector_dev/scenario_provision_updater.py new file mode 100644 index 00000000000..9698786bb45 --- /dev/null +++ b/utils/injector_dev/scenario_provision_updater.py @@ -0,0 +1,179 @@ +import shutil +from pathlib import Path + +import yaml + +from utils._logger import logger +from utils.k8s.k8s_component_image import K8sComponentImage + + +class ScenarioProvisionUpdater: + """A class for updating YAML scenario files with K8s component image information. + + This class reads a YAML scenario file from the utils/build/injector-dev/ directory, + updates the component image information in the 'versions' section, and writes + the updated file to the scenario logs directory. + + Attributes: + source_dir (Path): Path to the directory containing source YAML files + logs_dir (Path): Path to the directory where updated YAML files will be saved + + """ + + def __init__(self, logs_dir: Path | None = None): + """Initialize a ScenarioProvisionUpdater instance. + + Args: + logs_dir: Optional path to the logs directory. If not provided, + the current working directory will be used. + + """ + self.source_dir = Path("utils/build/injector-dev") + self.logs_dir = logs_dir if logs_dir else Path.cwd() + + # Ensure the source directory exists + if not self.source_dir.exists(): + raise FileNotFoundError(f"Source directory not found: {self.source_dir}") + + # Ensure the logs directory exists + if not self.logs_dir.exists(): + self.logs_dir.mkdir(parents=True, exist_ok=True) + + def update_scenario( + self, + scenario_filename: str, + cluster_agent_image: K8sComponentImage, + injector_image: K8sComponentImage, + weblog_image: K8sComponentImage | None = None, + k8s_lib_init_img: K8sComponentImage | None = None, + language: str | None = None, + ) -> Path: + """Update a scenario YAML file with K8s component image information. + + Args: + scenario_filename: Name of the scenario YAML file + cluster_agent_image: K8sComponentImage object for the cluster agent + injector_image: K8sComponentImage object for the injector + weblog_image: Optional K8sComponentImage object for the weblog/application images + k8s_lib_init_img: Optional K8sComponentImage object for the library init image + language: Optional language to update in ddTraceVersions (e.g., 'python', 'java') + + Returns: + Path to the updated YAML file + + Raises: + FileNotFoundError: If the scenario file doesn't exist + TypeError: If the YAML file doesn't have the expected structure + + """ + source_path = self.source_dir / scenario_filename + dest_path = self.logs_dir / scenario_filename + + # Check if the source file exists + if not source_path.exists(): + raise FileNotFoundError(f"Scenario file not found: {source_path}") + + # Load the YAML file + try: + with open(source_path, "r") as f: + scenario_yaml = yaml.safe_load(f) + except Exception as e: + raise ValueError(f"Failed to load YAML file {source_path}: {e}") from e + + # Check if the YAML file has the expected structure + if not isinstance(scenario_yaml, dict): + raise TypeError(f"Invalid YAML structure in {source_path}, expected dictionary") + + # Make sure 'helm' key exists + if "helm" not in scenario_yaml: + scenario_yaml["helm"] = {} + + # Make sure 'versions' key exists under 'helm' + if "versions" not in scenario_yaml["helm"]: + scenario_yaml["helm"]["versions"] = {} + + # Update the cluster_agent version + scenario_yaml["helm"]["versions"]["cluster_agent"] = { + "repository": cluster_agent_image.main_url, + "tag": cluster_agent_image.tag, + } + + # Update the injector version to use the repository/tag format + scenario_yaml["helm"]["versions"]["injector"] = { + "repository": injector_image.main_url, + "tag": injector_image.tag, + } + + # Update app images if weblog_image is provided + if weblog_image and "apps" in scenario_yaml.get("helm", {}): + for app in scenario_yaml["helm"]["apps"]: + if "values" in app and "image" in app["values"]: + # Update image repository and tag for all apps + app["values"]["image"]["repository"] = weblog_image.main_url + app["values"]["image"]["tag"] = weblog_image.tag + logger.info(f"Updated image for app {app.get('name', 'unknown')}") + + # Update ddTraceVersions nodes if language and k8s_lib_init_img are provided + if language and k8s_lib_init_img and "config" in scenario_yaml.get("helm", {}): + # Check if the datadog.apm.instrumentation.targets structure exists + datadog_config = scenario_yaml["helm"]["config"].get("datadog", {}) + apm_config = datadog_config.get("apm", {}) + instrumentation_config = apm_config.get("instrumentation", {}) + targets = instrumentation_config.get("targets", []) + + if targets: + logger.info(f"Found {len(targets)} instrumentation targets") + + # Update ddTraceVersions for each target + for target in targets: + # Check if ddTraceVersions exists, if not create it + if "ddTraceVersions" not in target: + target["ddTraceVersions"] = {} + + # Update the specific language version + target["ddTraceVersions"][language] = k8s_lib_init_img.tag + logger.info( + f"Updated ddTraceVersions.{language} = {k8s_lib_init_img.tag} " + f"for target {target.get('name', 'unknown')}" + ) + else: + logger.warning("No instrumentation targets found in the scenario YAML") + + # Write the updated YAML to the destination file + try: + with open(dest_path, "w") as f: + yaml.dump(scenario_yaml, f, default_flow_style=False) + logger.info(f"Updated scenario file written to {dest_path}") + except Exception as e: + raise OSError(f"Failed to write updated YAML file {dest_path}: {e}") from e + + return dest_path + + def copy_scenario(self, scenario_filename: str) -> Path: + """Copy a scenario YAML file without modifying it. + + Args: + scenario_filename: Name of the scenario YAML file + + Returns: + Path to the copied YAML file + + Raises: + FileNotFoundError: If the scenario file doesn't exist + + """ + source_path = self.source_dir / scenario_filename + dest_path = self.logs_dir / scenario_filename + + # Check if the source file exists + if not source_path.exists(): + raise FileNotFoundError(f"Scenario file not found: {source_path}") + + # Copy the file + try: + shutil.copy2(source_path, dest_path) + logger.info(f"Copied scenario file to {dest_path}") + except Exception as e: + raise OSError(f"Failed to copy YAML file {dest_path}: {e}") from e + + return dest_path