Skip to content

Commit d8843d4

Browse files
committed
add c-major array layout c api.
1 parent 19c47cb commit d8843d4

12 files changed

+581
-52
lines changed

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
107107
line_sender_c_example_array_elem_strides
108108
examples/concat.c
109109
examples/line_sender_c_example_array_elem_strides.c)
110+
compile_example(
111+
line_sender_c_example_array_c_major
112+
examples/concat.c
113+
examples/line_sender_c_example_array_c_major.c)
110114
compile_example(
111115
line_sender_c_example_auth
112116
examples/concat.c
@@ -141,6 +145,9 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
141145
compile_example(
142146
line_sender_cpp_example_auth
143147
examples/line_sender_cpp_example_auth.cpp)
148+
compile_example(
149+
line_sender_cpp_example_array_c_major
150+
examples/line_sender_cpp_example_array_c_major.cpp)
144151
compile_example(
145152
line_sender_cpp_example_tls_ca
146153
examples/line_sender_cpp_example_tls_ca.cpp)

cpp_test/test_line_sender.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,15 +199,26 @@ TEST_CASE("line_sender c api basics")
199199
reinterpret_cast<uint8_t*>(arr_data.data()),
200200
sizeof(arr_data),
201201
&err));
202+
line_sender_column_name arr_name3 = QDB_COLUMN_NAME_LITERAL("a3");
203+
CHECK(
204+
::line_sender_buffer_column_f64_arr_c_major(
205+
buffer,
206+
arr_name3,
207+
rank,
208+
shape,
209+
reinterpret_cast<uint8_t*>(arr_data.data()),
210+
sizeof(arr_data),
211+
&err));
202212
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
203213
CHECK(server.recv() == 0);
204-
CHECK(::line_sender_buffer_size(buffer) == 266);
214+
CHECK(::line_sender_buffer_size(buffer) == 382);
205215
CHECK(::line_sender_flush(sender, buffer, &err));
206216
::line_sender_buffer_free(buffer);
207217
CHECK(server.recv() == 1);
208218
std::string expect{"test,t1=v1 f1=="};
209219
push_double_to_buffer(expect, 0.5).append(",a1==");
210220
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
221+
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3==");
211222
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
212223
CHECK(server.msgs(0) == expect);
213224
}
@@ -282,18 +293,24 @@ TEST_CASE("line_sender c++ api basics")
282293
.symbol("t1", "v1")
283294
.symbol("t2", "")
284295
.column("f1", 0.5)
285-
.column<true>("a1", rank, shape, strides, arr_data)
286-
.column<false>("a2", rank, shape, elem_strides, arr_data)
296+
.column<questdb::ingress::array_strides_mode::bytes>(
297+
"a1", rank, shape, strides, arr_data)
298+
.column<questdb::ingress::array_strides_mode::elems>(
299+
"a2", rank, shape, elem_strides, arr_data)
300+
.column<questdb::ingress::array_strides_mode::c_major>(
301+
"a3", rank, shape, {}, arr_data)
287302
.at(questdb::ingress::timestamp_nanos{10000000});
288303

289304
CHECK(server.recv() == 0);
290-
CHECK(buffer.size() == 270);
305+
CHECK(buffer.size() == 386);
291306
sender.flush(buffer);
292307
CHECK(server.recv() == 1);
293308
std::string expect{"test,t1=v1,t2= f1=="};
294309
push_double_to_buffer(expect, 0.5).append(",a1==");
295310
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
296311
.append(",a2==");
312+
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
313+
.append(",a3==");
297314
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
298315
.append(" 10000000\n");
299316
CHECK(server.msgs(0) == expect);
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include <questdb/ingress/line_sender.h>
2+
#include <stdio.h>
3+
#include <stdbool.h>
4+
#include <string.h>
5+
#include "concat.h"
6+
7+
static bool example(const char* host, const char* port)
8+
{
9+
line_sender_error* err = NULL;
10+
line_sender* sender = NULL;
11+
line_sender_buffer* buffer = NULL;
12+
char* conf_str = concat("tcp::addr=", host, ":", port, ";protocol_version=2;");
13+
if (!conf_str)
14+
{
15+
fprintf(stderr, "Could not concatenate configuration string.\n");
16+
return false;
17+
}
18+
19+
line_sender_utf8 conf_str_utf8 = {0, NULL};
20+
if (!line_sender_utf8_init(
21+
&conf_str_utf8, strlen(conf_str), conf_str, &err))
22+
goto on_error;
23+
24+
sender = line_sender_from_conf(conf_str_utf8, &err);
25+
if (!sender)
26+
goto on_error;
27+
28+
free(conf_str);
29+
conf_str = NULL;
30+
31+
buffer = line_sender_buffer_new_for_sender(sender);
32+
line_sender_buffer_reserve(buffer, 64 * 1024);
33+
34+
line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("market_orders_c_major");
35+
line_sender_column_name symbol_col = QDB_COLUMN_NAME_LITERAL("symbol");
36+
line_sender_column_name book_col = QDB_COLUMN_NAME_LITERAL("order_book");
37+
38+
if (!line_sender_buffer_table(buffer, table_name, &err))
39+
goto on_error;
40+
41+
line_sender_utf8 symbol_val = QDB_UTF8_LITERAL("BTC-USD");
42+
if (!line_sender_buffer_symbol(buffer, symbol_col, symbol_val, &err))
43+
goto on_error;
44+
45+
size_t array_rank = 3;
46+
uintptr_t array_shape[] = {2, 3, 2};
47+
double array_data[] = {
48+
48123.5,
49+
2.4,
50+
48124.0,
51+
1.8,
52+
48124.5,
53+
0.9,
54+
48122.5,
55+
3.1,
56+
48122.0,
57+
2.7,
58+
48121.5,
59+
4.3};
60+
61+
if (!line_sender_buffer_column_f64_arr_c_major(
62+
buffer,
63+
book_col,
64+
array_rank,
65+
array_shape,
66+
(const uint8_t*)array_data,
67+
sizeof(array_data),
68+
&err))
69+
goto on_error;
70+
71+
if (!line_sender_buffer_at_nanos(buffer, line_sender_now_nanos(), &err))
72+
goto on_error;
73+
74+
if (!line_sender_flush(sender, buffer, &err))
75+
goto on_error;
76+
77+
line_sender_close(sender);
78+
return true;
79+
80+
on_error:;
81+
size_t err_len = 0;
82+
const char* err_msg = line_sender_error_msg(err, &err_len);
83+
fprintf(stderr, "Error: %.*s\n", (int)err_len, err_msg);
84+
free(conf_str);
85+
line_sender_error_free(err);
86+
line_sender_buffer_free(buffer);
87+
line_sender_close(sender);
88+
return false;
89+
}
90+
91+
int main(int argc, const char* argv[])
92+
{
93+
const char* host = (argc >= 2) ? argv[1] : "localhost";
94+
const char* port = (argc >= 3) ? argv[2] : "9009";
95+
return !example(host, port);
96+
}

examples/line_sender_cpp_example_array_byte_strides.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ static bool array_example(std::string_view host, std::string_view port)
3636
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
3737
buffer.table(table_name)
3838
.symbol(symbol_col, "BTC-USD"_utf8)
39-
.column<true>(book_col, 3, shape, strides, arr_data)
39+
.column<questdb::ingress::array_strides_mode::bytes>(
40+
book_col, 3, shape, strides, arr_data)
4041
.at(questdb::ingress::timestamp_nanos::now());
4142
sender.flush(buffer);
4243
return true;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include <questdb/ingress/line_sender.hpp>
2+
#include <iostream>
3+
#include <vector>
4+
5+
using namespace std::literals::string_view_literals;
6+
using namespace questdb::ingress::literals;
7+
8+
static bool array_example(std::string_view host, std::string_view port)
9+
{
10+
try
11+
{
12+
auto sender = questdb::ingress::line_sender::from_conf(
13+
"tcp::addr=" + std::string{host} + ":" + std::string{port} +
14+
";protocol_version=2;");
15+
16+
const auto table_name = "cpp_market_orders_c_major"_tn;
17+
const auto symbol_col = "symbol"_cn;
18+
const auto book_col = "order_book"_cn;
19+
size_t rank = 3;
20+
std::vector<uintptr_t> shape{2, 3, 2};
21+
std::array<double, 12> arr_data = {
22+
48123.5,
23+
2.4,
24+
48124.0,
25+
1.8,
26+
48124.5,
27+
0.9,
28+
48122.5,
29+
3.1,
30+
48122.0,
31+
2.7,
32+
48121.5,
33+
4.3};
34+
35+
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
36+
buffer.table(table_name)
37+
.symbol(symbol_col, "BTC-USD"_utf8)
38+
.column<questdb::ingress::array_strides_mode::c_major>(
39+
book_col, 3, shape, {}, arr_data)
40+
.at(questdb::ingress::timestamp_nanos::now());
41+
sender.flush(buffer);
42+
return true;
43+
}
44+
catch (const questdb::ingress::line_sender_error& err)
45+
{
46+
std::cerr << "[ERROR] " << err.what() << std::endl;
47+
return false;
48+
}
49+
}
50+
51+
int main(int argc, const char* argv[])
52+
{
53+
auto host = "localhost"sv;
54+
if (argc >= 2)
55+
host = std::string_view{argv[1]};
56+
57+
auto port = "9009"sv;
58+
if (argc >= 3)
59+
port = std::string_view{argv[2]};
60+
61+
return !array_example(host, port);
62+
}

examples/line_sender_cpp_example_array_elem_strides.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ static bool array_example(std::string_view host, std::string_view port)
3636
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
3737
buffer.table(table_name)
3838
.symbol(symbol_col, "BTC-USD"_utf8)
39-
.column<false>(book_col, 3, shape, strides, arr_data)
39+
.column<questdb::ingress::array_strides_mode::elems>(
40+
book_col, 3, shape, strides, arr_data)
4041
.at(questdb::ingress::timestamp_nanos::now());
4142
sender.flush(buffer);
4243
return true;

include/questdb/ingress/line_sender.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,30 @@ bool line_sender_buffer_column_str(
491491
line_sender_utf8 value,
492492
line_sender_error** err_out);
493493

494+
/**
495+
* Records a multidimensional array of 64-bit floating-point values in C-major
496+
* order.
497+
*
498+
* @param[in] buffer Line buffer object.
499+
* @param[in] name Column name.
500+
* @param[in] rank Number of dimensions of the array.
501+
* @param[in] shape Array of dimension sizes (length = `rank`).
502+
* Each element must be a positive integer.
503+
* @param[in] data_buffer First array element data.
504+
* @param[in] data_buffer_len Bytes length of the array data.
505+
* @param[out] err_out Set to an error object on failure (if non-NULL).
506+
* @return true on success, false on error.
507+
*/
508+
LINESENDER_API
509+
bool line_sender_buffer_column_f64_arr_c_major(
510+
line_sender_buffer* buffer,
511+
line_sender_column_name name,
512+
size_t rank,
513+
const uintptr_t* shape,
514+
const uint8_t* data_buffer,
515+
size_t data_buffer_len,
516+
line_sender_error** err_out);
517+
494518
/**
495519
* Record a multidimensional array of double for the given column.
496520
*

include/questdb/ingress/line_sender.hpp

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,18 @@ enum class protocol_version
109109
v2 = 2,
110110
};
111111

112+
enum class array_strides_mode
113+
{
114+
/** Strides are inferred from C-style row-major memory layout. */
115+
c_major,
116+
117+
/** Strides are provided in bytes */
118+
bytes,
119+
120+
/** Strides are provided in elements */
121+
elems,
122+
};
123+
112124
/* Possible sources of the root certificates used to validate the server's TLS
113125
* certificate. */
114126
enum class ca
@@ -641,11 +653,9 @@ class line_sender_buffer
641653
}
642654

643655
/**
644-
* Record a multidimensional double-precision array for the given column.
656+
* Records a multidimensional array of double-precision values.
645657
*
646-
* @tparam B Strides mode selector:
647-
* - `true` for byte-level strides
648-
* - `false` for element-level strides
658+
* @tparam Layout Memory layout specification (array_strides_mode)
649659
* @tparam T Element type (current only `double` is supported).
650660
* @tparam N Number of elements in the flat data array
651661
*
@@ -654,7 +664,7 @@ class line_sender_buffer
654664
* @param data Array first element data. Size must match product of
655665
* dimensions.
656666
*/
657-
template <bool B, typename T, size_t N>
667+
template <array_strides_mode Layout, typename T, size_t N>
658668
line_sender_buffer& column(
659669
column_name_view name,
660670
const size_t rank,
@@ -666,16 +676,46 @@ class line_sender_buffer
666676
std::is_same_v<T, double>,
667677
"Only double types are supported for arrays");
668678
may_init();
669-
line_sender_error::wrapped_call(
670-
B ? ::line_sender_buffer_column_f64_arr_byte_strides
671-
: ::line_sender_buffer_column_f64_arr_elem_strides,
672-
_impl,
673-
name._impl,
674-
rank,
675-
shape.data(),
676-
strides.data(),
677-
reinterpret_cast<const uint8_t*>(data.data()),
678-
sizeof(double) * N);
679+
switch (Layout)
680+
{
681+
case array_strides_mode::c_major:
682+
if (!strides.empty())
683+
{
684+
throw line_sender_error{
685+
line_sender_error_code::config_error,
686+
"C_Major layout requires empty strides vector"};
687+
}
688+
line_sender_error::wrapped_call(
689+
::line_sender_buffer_column_f64_arr_c_major,
690+
_impl,
691+
name._impl,
692+
rank,
693+
shape.data(),
694+
reinterpret_cast<const uint8_t*>(data.data()),
695+
sizeof(double) * N);
696+
break;
697+
case array_strides_mode::bytes:
698+
line_sender_error::wrapped_call(
699+
::line_sender_buffer_column_f64_arr_byte_strides,
700+
_impl,
701+
name._impl,
702+
rank,
703+
shape.data(),
704+
strides.data(),
705+
reinterpret_cast<const uint8_t*>(data.data()),
706+
sizeof(double) * N);
707+
break;
708+
case array_strides_mode::elems:
709+
line_sender_error::wrapped_call(
710+
::line_sender_buffer_column_f64_arr_elem_strides,
711+
_impl,
712+
name._impl,
713+
rank,
714+
shape.data(),
715+
strides.data(),
716+
reinterpret_cast<const uint8_t*>(data.data()),
717+
sizeof(double) * N);
718+
}
679719
return *this;
680720
}
681721

0 commit comments

Comments
 (0)