Skip to content

Commit 86db749

Browse files
committed
feat: add fuzz timestamp test data structure and related setup functions
1 parent 44b3f30 commit 86db749

File tree

1 file changed

+153
-1
lines changed

1 file changed

+153
-1
lines changed

datafusion/core/tests/sql/aggregates.rs

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1732,14 +1732,15 @@ async fn test_median_distinct_with_fuzz_table_dict_nulls() -> Result<()> {
17321732
async fn run_snapshot_test(
17331733
test_data: &TestData,
17341734
sql: &str,
1735-
use_sorted_output: bool,
1735+
_use_sorted_output: bool,
17361736
) -> Result<Vec<RecordBatch>> {
17371737
let (ctx_single, ctx_multi) = setup_test_contexts(test_data).await?;
17381738
let results = test_query_consistency(&ctx_single, &ctx_multi, sql).await?;
17391739
Ok(results)
17401740
}
17411741

17421742
/// Helper function for simpler snapshot tests that only need single-partition execution
1743+
#[allow(dead_code)]
17431744
async fn run_simple_snapshot_test(
17441745
ctx: &SessionContext,
17451746
sql: &str,
@@ -1751,6 +1752,7 @@ async fn run_simple_snapshot_test(
17511752

17521753
/// Helper function to run a complete snapshot test with TestData
17531754
/// This fully encapsulates the "setup data → SQL → assert_snapshot!" pattern
1755+
#[allow(dead_code)]
17541756
async fn run_complete_snapshot_test(
17551757
test_data: &TestData,
17561758
sql: &str,
@@ -1764,6 +1766,7 @@ async fn run_complete_snapshot_test(
17641766
}
17651767

17661768
/// Helper function to run a complete snapshot test with sorted output
1769+
#[allow(dead_code)]
17671770
async fn run_complete_sorted_snapshot_test(
17681771
test_data: &TestData,
17691772
sql: &str,
@@ -1775,3 +1778,152 @@ async fn run_complete_sorted_snapshot_test(
17751778

17761779
Ok(())
17771780
}
1781+
1782+
/// Test data structure for fuzz table with timestamp and dictionary columns containing nulls
1783+
struct FuzzTimestampTestData {
1784+
schema: Arc<Schema>,
1785+
utf8_low: StringArray,
1786+
u8_low: UInt8Array,
1787+
dictionary_utf8_low: DictionaryArray<UInt32Type>,
1788+
timestamp_us: TimestampMicrosecondArray,
1789+
}
1790+
1791+
impl FuzzTimestampTestData {
1792+
fn new() -> Self {
1793+
// Create dictionary columns with null keys and values
1794+
let dictionary_utf8_low = create_test_dict(
1795+
&[Some("dict_x"), None, Some("dict_y"), Some("dict_z")],
1796+
&[
1797+
Some(0), // dict_x
1798+
Some(1), // null value
1799+
Some(2), // dict_y
1800+
None, // null key
1801+
Some(0), // dict_x
1802+
Some(1), // null value
1803+
Some(3), // dict_z
1804+
None, // null key
1805+
Some(2), // dict_y
1806+
],
1807+
);
1808+
1809+
let utf8_low = StringArray::from(vec![
1810+
Some("alpha"),
1811+
Some("beta"),
1812+
Some("gamma"),
1813+
Some("delta"),
1814+
Some("alpha"),
1815+
Some("epsilon"),
1816+
Some("zeta"),
1817+
Some("delta"),
1818+
Some("gamma"),
1819+
]);
1820+
1821+
let u8_low = UInt8Array::from(vec![
1822+
Some(10),
1823+
Some(20),
1824+
Some(30),
1825+
Some(20),
1826+
Some(10),
1827+
Some(40),
1828+
Some(30),
1829+
Some(20),
1830+
Some(30),
1831+
]);
1832+
1833+
// Create timestamp data with some nulls
1834+
let timestamp_us = TimestampMicrosecondArray::from(vec![
1835+
Some(1000000), // 1970-01-01 00:00:01
1836+
Some(2000000), // 1970-01-01 00:00:02
1837+
Some(3000000), // 1970-01-01 00:00:03
1838+
None, // null timestamp
1839+
Some(1500000), // 1970-01-01 00:00:01.5
1840+
Some(4000000), // 1970-01-01 00:00:04
1841+
Some(2500000), // 1970-01-01 00:00:02.5
1842+
Some(3500000), // 1970-01-01 00:00:03.5
1843+
Some(2800000), // 1970-01-01 00:00:02.8
1844+
]);
1845+
1846+
let schema = Arc::new(Schema::new(vec![
1847+
Field::new("utf8_low", DataType::Utf8, true),
1848+
Field::new("u8_low", DataType::UInt8, true),
1849+
Field::new("dictionary_utf8_low", string_dict_type(), true),
1850+
Field::new(
1851+
"timestamp_us",
1852+
DataType::Timestamp(TimeUnit::Microsecond, None),
1853+
true,
1854+
),
1855+
]));
1856+
1857+
Self {
1858+
schema,
1859+
utf8_low,
1860+
u8_low,
1861+
dictionary_utf8_low,
1862+
timestamp_us,
1863+
}
1864+
}
1865+
}
1866+
1867+
/// Sets up test contexts for fuzz table with timestamps and both single and multiple partitions
1868+
async fn setup_fuzz_timestamp_test_contexts() -> Result<(SessionContext, SessionContext)>
1869+
{
1870+
let test_data = FuzzTimestampTestData::new();
1871+
1872+
// Single partition context
1873+
let ctx_single = create_fuzz_timestamp_context_with_partitions(&test_data, 1).await?;
1874+
1875+
// Multiple partition context
1876+
let ctx_multi = create_fuzz_timestamp_context_with_partitions(&test_data, 3).await?;
1877+
1878+
Ok((ctx_single, ctx_multi))
1879+
}
1880+
1881+
/// Creates a session context with fuzz timestamp table partitioned into specified number of partitions
1882+
async fn create_fuzz_timestamp_context_with_partitions(
1883+
test_data: &FuzzTimestampTestData,
1884+
num_partitions: usize,
1885+
) -> Result<SessionContext> {
1886+
let ctx = SessionContext::new_with_config(
1887+
SessionConfig::new().with_target_partitions(num_partitions),
1888+
);
1889+
1890+
let batches = split_fuzz_timestamp_data_into_batches(test_data, num_partitions)?;
1891+
let provider = MemTable::try_new(test_data.schema.clone(), batches)?;
1892+
ctx.register_table("fuzz_table", Arc::new(provider))?;
1893+
1894+
Ok(ctx)
1895+
}
1896+
1897+
/// Splits fuzz timestamp test data into multiple batches for partitioning
1898+
fn split_fuzz_timestamp_data_into_batches(
1899+
test_data: &FuzzTimestampTestData,
1900+
num_partitions: usize,
1901+
) -> Result<Vec<Vec<RecordBatch>>> {
1902+
debug_assert!(num_partitions > 0, "num_partitions must be greater than 0");
1903+
let total_len = test_data.utf8_low.len();
1904+
let chunk_size = total_len.div_ceil(num_partitions);
1905+
1906+
let mut batches = Vec::new();
1907+
let mut start = 0;
1908+
1909+
while start < total_len {
1910+
let end = min(start + chunk_size, total_len);
1911+
let len = end - start;
1912+
1913+
if len > 0 {
1914+
let batch = RecordBatch::try_new(
1915+
test_data.schema.clone(),
1916+
vec![
1917+
Arc::new(test_data.utf8_low.slice(start, len)),
1918+
Arc::new(test_data.u8_low.slice(start, len)),
1919+
Arc::new(test_data.dictionary_utf8_low.slice(start, len)),
1920+
Arc::new(test_data.timestamp_us.slice(start, len)),
1921+
],
1922+
)?;
1923+
batches.push(vec![batch]);
1924+
}
1925+
start = end;
1926+
}
1927+
1928+
Ok(batches)
1929+
}

0 commit comments

Comments
 (0)