Skip to content

Fix API Security feature tests for thread local sampler #4779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def _collect_item_metadata(item: pytest.Item):
# Case of a test with no parameters. Onboarding: we removed the parameter/machine with excludedBranches
logger.info(f"No parameters found for ${item.nodeid}")
else:
raise ValueError(f"Unexpected test declaration for {item.nodeid} : {details}")
pytest.exit(f"Unexpected test declaration for {item.nodeid} : {details}", 1)

return {
"details": details,
Expand Down
28 changes: 16 additions & 12 deletions tests/appsec/api_security/test_apisec_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ class Test_API_Security_Sampling_Different_Endpoints:
"""Test API Security - with different endpoints"""

def setup_sampling_delay(self):
self.request1 = weblog.get("/api_security/sampling/200")
self.request2 = weblog.get("/api_security_sampling/1")
self.all_requests = [weblog.get("/api_security/sampling/200") for _ in range(10)]
with weblog.get_session() as session:
self.request1 = session.get("/api_security/sampling/200")
self.request2 = session.get("/api_security_sampling/1")
self.all_requests = [session.get("/api_security/sampling/200") for _ in range(10)]

def test_sampling_delay(self):
assert self.request1.status_code == 200
Expand All @@ -92,8 +93,9 @@ class Test_API_Security_Sampling_Different_Paths:
def setup_sampling_delay(self):
# Wait for 10s to avoid other tests calling same endpoints
time.sleep(10)
self.request1 = weblog.get("/api_security_sampling/11")
self.all_requests = [weblog.get(f"/api_security_sampling/{i}") for i in range(10)]
with weblog.get_session() as session:
self.request1 = session.get("/api_security_sampling/11")
self.all_requests = [session.get(f"/api_security_sampling/{i}") for i in range(10)]

def test_sampling_delay(self):
assert self.request1.status_code == 200
Expand All @@ -111,9 +113,10 @@ class Test_API_Security_Sampling_Different_Status:
"""Test API Security - Same endpoint and different status"""

def setup_sampling_delay(self):
self.request1 = weblog.get("/api_security/sampling/200")
self.request2 = weblog.get("/api_security/sampling/201")
self.all_requests = [weblog.get("/api_security/sampling/201") for _ in range(10)]
with weblog.get_session() as session:
self.request1 = session.get("/api_security/sampling/200")
self.request2 = session.get("/api_security/sampling/201")
self.all_requests = [session.get("/api_security/sampling/201") for _ in range(10)]

def test_sampling_delay(self):
"""Can provide request header schema"""
Expand All @@ -139,10 +142,11 @@ class Test_API_Security_Sampling_With_Delay:
def setup_sampling_delay(self):
# Wait for 15s to avoid other tests calling same endpoints
time.sleep(15)
self.request1 = weblog.get("/api_security_sampling/30")
self.request2 = weblog.get("/api_security_sampling/30")
time.sleep(4) # Delay is set to 3s via the env var DD_API_SECURITY_SAMPLE_DELAY
self.request3 = weblog.get("/api_security_sampling/30")
with weblog.get_session() as session:
self.request1 = session.get("/api_security_sampling/30")
self.request2 = session.get("/api_security_sampling/30")
time.sleep(4) # Delay is set to 3s via the env var DD_API_SECURITY_SAMPLE_DELAY
self.request3 = session.get("/api_security_sampling/30")

def test_sampling_delay(self):
"""Can provide request header schema"""
Expand Down
221 changes: 116 additions & 105 deletions tests/appsec/test_asm_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,29 @@ def propagated_tag_value(self):
def propagated_tag_and_value(self):
return self.propagated_tag() + "=" + self.propagated_tag_value()

def setup_product_is_enabled(self):
def setup_product_is_enabled(self, session=None):
headers = {}
if self.tested_product == "appsec":
headers = {
"User-Agent": "Arachni/v1", # attack if APPSEC enabled
}
self.check_r = weblog.get(self.request_downstream_url, headers=headers)
self.check_r = session.get(self.request_downstream_url, headers=headers)

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "-1",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "-1",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)

@bug(
condition=(
Expand Down Expand Up @@ -157,19 +158,20 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_minus_1
assert "X-Datadog-Trace-Id" not in downstream_headers

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "0",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "0",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)

def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -202,19 +204,20 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_0(self)
assert "X-Datadog-Trace-Id" not in downstream_headers

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "1",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "1",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)

def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -247,19 +250,20 @@ def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_1(self)
assert "X-Datadog-Trace-Id" not in downstream_headers

def setup_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "2",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-sampling-priority": "2",
"x-datadog-origin": "rum",
"x-datadog-tags": "_dd.p.other=1",
},
)

def test_no_appsec_upstream__no_asm_event__is_kept_with_priority_1__from_2(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -380,19 +384,20 @@ def test_no_upstream_appsec_propagation__with_asm_event__is_kept_with_priority_2
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "0",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "0",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)

def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -424,19 +429,20 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_0
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "1",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "1",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)

def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -468,19 +474,20 @@ def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_1
assert downstream_headers["X-Datadog-Trace-Id"] == "1212121212121212121"

def setup_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_2(self):
self.setup_product_is_enabled()
trace_id = 1212121212121212121
parent_id = 34343434
self.r = weblog.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "2",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)
with weblog.get_session() as session:
self.setup_product_is_enabled(session)
trace_id = 1212121212121212121
parent_id = 34343434
self.r = session.get(
"/requestdownstream",
headers={
"x-datadog-trace-id": str(trace_id),
"x-datadog-parent-id": str(parent_id),
"x-datadog-origin": "rum",
"x-datadog-sampling-priority": "2",
"x-datadog-tags": self.propagated_tag_and_value(),
},
)

def test_upstream_appsec_propagation__no_asm_event__is_propagated_as_is__being_2(self):
self.assert_product_is_enabled(self.check_r, self.tested_product)
Expand Down Expand Up @@ -964,11 +971,12 @@ def test_first_request_retained(self):
self.verify_trace_sampling(self.first_request, should_be_retained=True, should_have_schema=True)

def setup_different_endpoints(self):
self.request1 = weblog.get("/api_security/sampling/200", headers=self._get_headers())
self.request2 = weblog.get("/api_security_sampling/1", headers=self._get_headers())
self.subsequent_requests = [
weblog.get("/api_security/sampling/200", headers=self._get_headers()) for _ in range(5)
]
with weblog.get_session() as session:
self.request1 = session.get("/api_security/sampling/200", headers=self._get_headers())
self.request2 = session.get("/api_security_sampling/1", headers=self._get_headers())
self.subsequent_requests = [
session.get("/api_security/sampling/200", headers=self._get_headers()) for _ in range(5)
]

def test_different_endpoints(self):
# First requests to different endpoints retained with schema
Expand All @@ -987,10 +995,12 @@ def setup_sampling_window_renewal(self):
time.sleep(4) # Wait for the sampling window to expire

self.endpoint = "/api_security/sampling/200"
self.window1_request1 = weblog.get(self.endpoint, headers=self._get_headers())
self.window1_request2 = weblog.get(self.endpoint, headers=self._get_headers())
time.sleep(4) # Delay is set to 3s via the env var DD_API_SECURITY_SAMPLE_DELAY
self.window2_request1 = weblog.get(self.endpoint, headers=self._get_headers())

with weblog.get_session() as session:
self.window1_request1 = session.get(self.endpoint, headers=self._get_headers())
self.window1_request2 = session.get(self.endpoint, headers=self._get_headers())
time.sleep(4) # Delay is set to 3s via the env var DD_API_SECURITY_SAMPLE_DELAY
self.window2_request1 = session.get(self.endpoint, headers=self._get_headers())

def test_sampling_window_renewal(self):
"""Verify that endpoint sampling resets after the sampling window expires"""
Expand Down Expand Up @@ -1020,9 +1030,10 @@ def setup_appsec_propagation_does_not_force_schema_collection(self):
headers["x-datadog-tags"] = f"{self.propagated_tag()}={self.propagated_tag_value()}"

# Make multiple requests to same endpoint that would normally be sampled out
self.request1 = weblog.get(self.endpoint, headers=headers)
self.request2 = weblog.get(self.endpoint, headers=headers)
self.request3 = weblog.get(self.endpoint, headers=headers)
with weblog.get_session() as session:
self.request1 = session.get(self.endpoint, headers=headers)
self.request2 = session.get(self.endpoint, headers=headers)
self.request3 = session.get(self.endpoint, headers=headers)

def test_appsec_propagation_does_not_force_schema_collection(self):
"""Test that spans with USER_KEEP priority do not force schema collection"""
Expand Down
Loading