From cf967ffdffea30a380838a0377fdce824b7788e2 Mon Sep 17 00:00:00 2001 From: Sebastian Berg Date: Wed, 12 Feb 2025 13:43:49 +0100 Subject: [PATCH 1/3] WIP: Try using imaging constraints for joins Signed-off-by: Sebastian Berg --- cpp/include/legate_dataframe/core/library.hpp | 2 + cpp/include/legate_dataframe/core/table.hpp | 38 +++++ .../legate_dataframe/hashpartition.hpp | 56 ++++++ cpp/include/legate_dataframe/join.hpp | 12 +- cpp/src/core/library.cpp | 27 ++- cpp/src/core/table.cpp | 28 +++ cpp/src/hashpartition.cpp | 160 ++++++++++++++++++ cpp/src/join.cpp | 129 ++++++++++---- cpp/src/sort.cpp | 2 +- python/benchmarks/join.py | 18 +- python/legate_dataframe/lib/join.pyx | 14 +- 11 files changed, 448 insertions(+), 38 deletions(-) create mode 100644 cpp/include/legate_dataframe/hashpartition.hpp create mode 100644 cpp/src/hashpartition.cpp diff --git a/cpp/include/legate_dataframe/core/library.hpp b/cpp/include/legate_dataframe/core/library.hpp index a00a364..c00678a 100644 --- a/cpp/include/legate_dataframe/core/library.hpp +++ b/cpp/include/legate_dataframe/core/library.hpp @@ -34,7 +34,9 @@ enum : int { BinaryOpColCol, BinaryOpColScalar, BinaryOpScalarCol, + HashPartition, Join, + JoinLocal, ToTimestamps, ExtractTimestampComponent, Sequence, diff --git a/cpp/include/legate_dataframe/core/table.hpp b/cpp/include/legate_dataframe/core/table.hpp index 9a3faa8..9c44127 100644 --- a/cpp/include/legate_dataframe/core/table.hpp +++ b/cpp/include/legate_dataframe/core/table.hpp @@ -332,6 +332,16 @@ class PhysicalTable { */ [[nodiscard]] int32_t num_columns() const { return columns_.size(); } + /** + * @brief Finds the global row offset of the table. + * + * @throws std::out_of_range if there is no column in the table. + * @throws std::runtime_error if the first column is unbound (assumes others are not) + * + * @return The row offset in number of rows (inclusive). + */ + [[nodiscard]] int64_t global_row_offset() { return columns_.at(0).global_row_offset(); } + /** * @brief Return a cudf table view of this physical table * @@ -434,6 +444,25 @@ std::vector add_next_input(legate::AutoTask& task, const LogicalTable& tbl, bool broadcast = false); + +/** + * @brief Add a logical table to the next input task argument + * + * This adds alignment constraints to all logical columns within the table. + * This should match a call to `get_next_input()` by a legate task. + * + * NB: the order of "add_next_*" calls must match the order of the + * corresponding "get_next_*" calls. + * + * @param task The legate task to add the argument. + * @param tbl The logical table to add as the next task argument. + * @param constraints A LogicalArray belonging to tbl as returned by `hashpartition`. + * If passed, these are added as imaging constraints for the table. + */ +std::vector add_next_input(legate::AutoTask& task, + const LogicalTable& tbl, + const legate::LogicalArray& constraints); + /** * @brief Add a logical table to the next output task argument * @@ -452,11 +481,20 @@ template <> inline task::PhysicalTable get_next_input(GPUTaskContext& ctx) { auto num_columns = get_next_scalar(ctx); + bool get_constraints = false; + if (num_columns < 0) { + num_columns = -num_columns; + get_constraints = true; + } std::vector cols; cols.reserve(num_columns); for (auto i = 0; i < num_columns; ++i) { cols.push_back(argument::get_next_input(ctx)); } + if (get_constraints) { + // Imaging constraints were added. We don't need them, but must skip it. + ctx.get_next_input_arg(); + } return task::PhysicalTable(std::move(cols)); } diff --git a/cpp/include/legate_dataframe/hashpartition.hpp b/cpp/include/legate_dataframe/hashpartition.hpp new file mode 100644 index 0000000..729138a --- /dev/null +++ b/cpp/include/legate_dataframe/hashpartition.hpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace legate::dataframe { + +/** + * @brief Hash partition a table based on some columns. + * + * This returns a new table and partitioning information + * + * TODO: Should maybe just attach that to the table. Need a convenient way + * (or at least an example) for applying the partitioning constraints. + * + * If we attach some user unique "partition" object (maybe?) then the + * user could drag that around. + * I.e. a partition is described by the column (names) a second step working + * on a partitioned table will always have to use those identical names in some way + * (e.g. for a local join). (in fact, maybe to the point that remembering the + * hashes might be useful often?) + * + * @param tbl The table to hash partition. + * @param keys The column names to partition by. + * @param num_parts Number of partitions or -1 in which case the current number + * of ranks will be used. (TODO: To be seen what makes sense here.) + * @return A new LogicalTable with and a LogicalArray describing its partitioning. + */ +std::pair +hashpartition(const LogicalTable& tbl, const std::set& keys, int num_parts = -1); + + +// TODO: Do we really duplicate docs for this? +std::pair +hashpartition(const LogicalTable& tbl, const std::set& keys, int num_parts = -1); + + +} // namespace legate::dataframe diff --git a/cpp/include/legate_dataframe/join.hpp b/cpp/include/legate_dataframe/join.hpp index 70e1e3c..bf7c647 100644 --- a/cpp/include/legate_dataframe/join.hpp +++ b/cpp/include/legate_dataframe/join.hpp @@ -45,6 +45,9 @@ enum class BroadcastInput : int32_t { AUTO = 0, LEFT, RIGHT }; * @param rhs_out_columns Indices of the right hand table columns to include in the result. * @param compare_nulls Controls whether null join-key values should match or not * @param broadcast Which, if any, of the inputs should be copied to all workers. + * @param _num_partitions TODO: For testing. + * The number of partitions to use. If -1 uses the nccl approach instead. If + * broadcast is not AUTO, this is ignored (no shuffling is needed). * @return The joining result */ LogicalTable join(const LogicalTable& lhs, @@ -55,7 +58,8 @@ LogicalTable join(const LogicalTable& lhs, const std::vector& lhs_out_columns, const std::vector& rhs_out_columns, cudf::null_equality compare_nulls = cudf::null_equality::EQUAL, - BroadcastInput broadcast = BroadcastInput::AUTO); + BroadcastInput broadcast = BroadcastInput::AUTO, + int _num_partitions = -1); /** * @brief Perform a join between the specified tables. @@ -75,6 +79,9 @@ LogicalTable join(const LogicalTable& lhs, * @param rhs_out_columns Names of the right hand table columns to include in the result. * @param compare_nulls Controls whether null join-key values should match or not * @param broadcast Which, if any, of the inputs should be copied to all workers. + * @param _num_partitions TODO: For testing. + * The number of partitions to use. If -1 uses the nccl approach instead. If + * broadcast is not AUTO, this is ignored (no shuffling is needed). * @return The joining result */ LogicalTable join(const LogicalTable& lhs, @@ -85,7 +92,8 @@ LogicalTable join(const LogicalTable& lhs, const std::vector& lhs_out_columns, const std::vector& rhs_out_columns, cudf::null_equality compare_nulls = cudf::null_equality::EQUAL, - BroadcastInput broadcast = BroadcastInput::AUTO); + BroadcastInput broadcast = BroadcastInput::AUTO, + int _num_partitions = -1); /** * @brief Perform a join between the specified tables. diff --git a/cpp/src/core/library.cpp b/cpp/src/core/library.cpp index 3bb4c0a..7cfa2b2 100644 --- a/cpp/src/core/library.cpp +++ b/cpp/src/core/library.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace legate::dataframe { namespace task { @@ -48,6 +50,7 @@ class Mapper : public legate::mapping::Mapper { const std::vector& options) override { using legate::mapping::StoreMapping; + const auto task_id = static_cast(task.task_id()); std::vector mappings; // For now, we set "exact" policy for all Stores @@ -58,8 +61,22 @@ class Mapper : public legate::mapping::Mapper { mappings.push_back( StoreMapping::default_mapping(store, options.front(), /*exact = */ true)); } + // TODO: This is also needed for strings! + mappings.back().policy().ordering.set_c_order(); } + bool first = true; for (const legate::mapping::Array& ary : task.outputs()) { + if (first && task_id == legate::dataframe::task::OpCode::HashPartition) { + /* The first output of HashPartition is mapped to the CZMEM */ + // TODO: We could probably just use sysmem here, but there seems to be + // a small issue (as of early 25.03.00.dev) and we should then also set it + // for consuming tasks (since we never actually read it there). + mappings.push_back( + StoreMapping::default_mapping(ary.stores().at(0), legate::mapping::StoreTarget::ZCMEM, /*exact */ true)); + first = false; + continue; + } + first = false; if (ary.type().variable_size()) { continue; } for (const legate::mapping::Store& store : ary.stores()) { mappings.push_back( @@ -105,10 +122,16 @@ class Mapper : public legate::mapping::Mapper { return size_exchange_nbytes + metadata_nbytes; } + case legate::dataframe::task::OpCode::HashPartition: { + // TODO: This could probably just use sysmem here + auto nrank = task.get_launch_domain().get_volume(); + auto num_parts = task.scalars().at(0).value(); + num_parts = num_parts < 0 ? nrank : num_parts; + return nrank * num_parts * sizeof(legate::Rect<1>); + } default: return 0; } } - // TODO: Returning nullopt prevents other parallel task launches so it would be // good to provide estimated usage for most tasks here. return std::nullopt; @@ -128,7 +151,7 @@ legate::Library create_and_registrate_library() } // Set with_has_allocations globally since currently all tasks allocate (and libcudf may also) // Also ensure we can generally work with 1000+ non-string return columns. - auto options = legate::VariantOptions{}.with_has_allocations(true).with_return_size(32768); + auto options = legate::VariantOptions{}.with_has_allocations(true); auto context = legate::Runtime::get_runtime()->find_or_create_library(library_name, legate::ResourceConfig{}, diff --git a/cpp/src/core/table.cpp b/cpp/src/core/table.cpp index c0d6b64..37e2810 100644 --- a/cpp/src/core/table.cpp +++ b/cpp/src/core/table.cpp @@ -24,6 +24,8 @@ #include +#include + namespace legate::dataframe { LogicalTable::LogicalTable(std::vector columns, @@ -139,6 +141,7 @@ bool PhysicalTable::is_broadcasted() const } // namespace task namespace argument { + std::vector add_next_input(legate::AutoTask& task, const LogicalTable& tbl, bool broadcast) @@ -154,6 +157,31 @@ std::vector add_next_input(legate::AutoTask& task, return ret; } +std::vector add_next_input(legate::AutoTask& task, + const LogicalTable& tbl, + const legate::LogicalArray& constraints) +{ + std::vector ret; + // First we add number of columns, use negative to signal constraints + add_next_scalar(task, -tbl.num_columns()); + // Then we add each column + for (const auto& col : tbl.get_columns()) { + ret.push_back(add_next_input(task, col, /* broadcast */ false)); + } + auto constraints_var = task.add_input(constraints); + // Require a column-wise partition of the constraints (hist) + task.add_constraint(legate::broadcast(constraints_var, {0})); + //for (auto var : ret) { + // std::cout << " adding imaging constraints" << std::endl; + // task.add_constraint(legate::image(constraints_var, var)); + //} + task.add_constraint(legate::image(constraints_var, ret.at(0))); + // TODO(seberg): If it works, it feels nice if we would add this here and + // just add image constraints for the first row. + add_alignment_constraints(task, ret); + return ret; +} + std::vector add_next_output(legate::AutoTask& task, const LogicalTable& tbl) { std::vector ret; diff --git a/cpp/src/hashpartition.cpp b/cpp/src/hashpartition.cpp new file mode 100644 index 0000000..4f6c395 --- /dev/null +++ b/cpp/src/hashpartition.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include + +#include + +namespace legate::dataframe { +namespace task { + +class HashPartitionTask : public Task { + public: + static void gpu_variant(legate::TaskContext context) + { + GPUTaskContext ctx{context}; + auto num_parts = argument::get_next_scalar(ctx); + auto table = argument::get_next_input(ctx); + auto keys_idx = argument::get_next_scalar_vector(ctx); + + auto partitions = ctx.get_next_output_arg(); + auto output = argument::get_next_output(ctx); + + auto table_view = table.table_view(); + auto row_offset = table.global_row_offset(); + + if (num_parts < 0) { + num_parts = ctx.nranks; // default to same number as initially. + } + + std::unique_ptr partition_table; + std::vector partition_starts(num_parts); + if (table_view.num_rows() == 0) { + partition_table = cudf::empty_like(table_view); + std::iota(partition_starts.begin(), partition_starts.end(), 0); + } else { + auto res = cudf::hash_partition(table_view, + keys_idx, + num_parts, + cudf::hash_id::HASH_MURMUR3, + cudf::DEFAULT_HASH_SEED, + ctx.stream(), + ctx.mr()); + partition_table.swap(res.first); + partition_starts.swap(res.second); + partition_starts.push_back(table_view.num_rows()); + } + + auto partitions_buf = partitions.data().create_output_buffer, 1>( + {num_parts}, true); + + std::ostringstream info; + info << "hash partition setup @" << ctx.rank << "\n"; + for (int i = 0; i < num_parts; i++) { + // legate ranges are inclusive: + partitions_buf[{i}].lo = row_offset + partition_starts.at(i); + partitions_buf[{i}].hi = row_offset + partition_starts.at(i+1) - 1; + + info << " " << row_offset + partition_starts.at(i) << ":" << row_offset + partition_starts.at(i+1) - 1 << "\n"; + } + std::cout << info.str() << std::endl; + // TODO: just a hack to rule out, I don't think the synch is necessary! + cudaStreamSynchronize(ctx.stream()); + + output.move_into(std::move(partition_table)); + } +}; + +} // namespace task + + +std::pair +hashpartition(const LogicalTable& table, const std::set& keys, int num_parts) +{ + if (keys.size() == 0) { + throw std::invalid_argument("keys must have at least one entry."); + } + if (num_parts < 1 && num_parts != -1) { + throw std::invalid_argument("num_parts must be >=1 or -1 to indicate same as input."); + } + + LogicalTable output = LogicalTable::empty_like(table); + + // The result of this task are "partitions" described in a nranks x num_parts + // array of partition ranges. Use an unbound store so we can avoid defining + // the number of parts here. + // TODO: is an unbound store really needed. + // TODO: Legate has problems with mixed ndims, so this is 1-D and reshaped later. + legate::LogicalArray partitions( + legate::Runtime::get_runtime()->create_array(legate::rect_type(1), 1) + ); + + auto runtime = legate::Runtime::get_runtime(); + legate::AutoTask task = + runtime->create_task(get_library(), task::HashPartitionTask::TASK_ID); + argument::add_next_scalar(task, num_parts); + argument::add_next_input(task, table); + argument::add_next_scalar_vector(task, std::vector(keys.begin(), keys.end())); + + // Add the result partitions, broadcasting (not splitting) the rank specific dimension. + auto partitions_var = task.add_output(partitions); + // task.add_constraint(legate::broadcast(partitions_var, {1})); + // And the result (reordered to be partitioned) dataframe + argument::add_next_output(task, output); + + runtime->submit(std::move(task)); + // TODO: No good of course, fetches volume and num_parts == -1 is possible... + partitions = partitions.delinearize(0, {partitions.volume() / num_parts, num_parts}); + return std::make_pair(output, partitions); +} + + +std::pair +hashpartition(const LogicalTable& table, const std::set& keys, int num_parts) +{ + std::set key_idx; + auto colname_map = table.get_column_names(); + for (auto key : keys) { + key_idx.insert(colname_map.at(key)); + } + + return hashpartition(table, key_idx, num_parts); +} + +} // namespace legate::dataframe + +namespace { + +void __attribute__((constructor)) register_tasks() +{ + legate::dataframe::task::HashPartitionTask::register_variants(); +} + +} // namespace diff --git a/cpp/src/join.cpp b/cpp/src/join.cpp index bc7f89f..535a8a7 100644 --- a/cpp/src/join.cpp +++ b/cpp/src/join.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include #include @@ -28,8 +29,11 @@ #include #include +#include #include +#include + namespace legate::dataframe { namespace task { namespace { @@ -257,6 +261,41 @@ class JoinTask : public Task { } }; + +class JoinLocalTask : public Task { + public: + static void gpu_variant(legate::TaskContext context) + { + GPUTaskContext ctx{context}; + const auto lhs = argument::get_next_input(ctx); + //const auto rhs = argument::get_next_input(ctx); + const auto lhs_keys = argument::get_next_scalar_vector(ctx); + const auto rhs_keys = argument::get_next_scalar_vector(ctx); + auto join_type = argument::get_next_scalar(ctx); + auto null_equality = argument::get_next_scalar(ctx); + const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); + const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); + auto output = argument::get_next_output(ctx); + + std::ostringstream info; + info << "local join @" << ctx.rank << " nrows1: " << lhs.table_view().num_rows() << std::endl; + // info << ", nrows2: " << rhs.table_view().num_rows() << std::endl; + std::cout << info.str() << std::endl; + + //cudf_join_and_gather(ctx, + // lhs.table_view(), + // rhs.table_view(), + // lhs_keys, + // rhs_keys, + // join_type, + // null_equality, + // lhs_out_cols, + // rhs_out_cols, + // output); + } +}; + + } // namespace task namespace { @@ -291,7 +330,8 @@ LogicalTable join(const LogicalTable& lhs, const std::vector& lhs_out_columns, const std::vector& rhs_out_columns, cudf::null_equality compare_nulls, - BroadcastInput broadcast) + BroadcastInput broadcast, + int _num_partitions) { auto runtime = legate::Runtime::get_runtime(); if (lhs_keys.size() != rhs_keys.size()) { @@ -330,33 +370,61 @@ LogicalTable join(const LogicalTable& lhs, auto ret_names = concat(lhs_out.get_column_name_vector(), rhs_out.get_column_name_vector()); auto ret = LogicalTable(std::move(ret_cols), std::move(ret_names)); - legate::AutoTask task = runtime->create_task(get_library(), task::JoinTask::TASK_ID); - // TODO: While legate may broadcast some arrays, it would be good to add - // a heuristic (e.g. based on the fact that we need to do copies - // anyway, so the broadcast may actually copy less). - // That could be done here, in a mapper, or within the task itself. - argument::add_next_input(task, lhs, broadcast == BroadcastInput::LEFT); - argument::add_next_input(task, rhs, broadcast == BroadcastInput::RIGHT); - argument::add_next_scalar_vector(task, std::vector(lhs_keys.begin(), lhs_keys.end())); - argument::add_next_scalar_vector(task, std::vector(rhs_keys.begin(), rhs_keys.end())); - argument::add_next_scalar(task, static_cast>(join_type)); - argument::add_next_scalar( - task, static_cast>(compare_nulls)); - argument::add_next_scalar_vector( - task, std::vector(lhs_out_columns.begin(), lhs_out_columns.end())); - argument::add_next_scalar_vector( - task, std::vector(rhs_out_columns.begin(), rhs_out_columns.end())); - argument::add_next_output(task, ret); - if (broadcast == BroadcastInput::AUTO) { - task.add_communicator("nccl"); - } else if (join_type == JoinType::FULL || - (broadcast == BroadcastInput::LEFT && join_type != JoinType::INNER)) { - throw std::runtime_error( - "Force broadcast was indicated, but repartitioning is required. " - "FULL joins do not support broadcasting and LEFT joins only for the " - "right hand side argument."); + // TODO(seberg): Should probably split also the nccl version (unless there is some memory disadvantage?) + if (_num_partitions != -1 && broadcast == BroadcastInput::AUTO) { + /* Try to use the local partition -> image constraints -> local join method */ + auto [lhs_part, lhs_constraints] = legate::dataframe::hashpartition(lhs, lhs_keys, _num_partitions); + legate::Runtime::get_runtime()->issue_execution_fence(true); + auto [rhs_part, rhs_constraints] = legate::dataframe::hashpartition(rhs, rhs_keys, _num_partitions); + legate::Runtime::get_runtime()->issue_execution_fence(true); + + legate::AutoTask task = runtime->create_task(get_library(), task::JoinLocalTask::TASK_ID); + + argument::add_next_input(task, lhs, lhs_constraints); + // argument::add_next_input(task, rhs, lhs_constraints); + argument::add_next_scalar_vector(task, std::vector(lhs_keys.begin(), lhs_keys.end())); + argument::add_next_scalar_vector(task, std::vector(rhs_keys.begin(), rhs_keys.end())); + argument::add_next_scalar(task, static_cast>(join_type)); + argument::add_next_scalar( + task, static_cast>(compare_nulls)); + argument::add_next_scalar_vector( + task, std::vector(lhs_out_columns.begin(), lhs_out_columns.end())); + argument::add_next_scalar_vector( + task, std::vector(rhs_out_columns.begin(), rhs_out_columns.end())); + argument::add_next_output(task, ret); + + runtime->submit(std::move(task)); + } + else { + /* Use the NCCL approach (or no communication necessary) */ + legate::AutoTask task = runtime->create_task(get_library(), task::JoinTask::TASK_ID); + // TODO: While legate may broadcast some arrays, it would be good to add + // a heuristic (e.g. based on the fact that we need to do copies + // anyway, so the broadcast may actually copy less). + // That could be done here, in a mapper, or within the task itself. + argument::add_next_input(task, lhs, broadcast == BroadcastInput::LEFT); + argument::add_next_input(task, rhs, broadcast == BroadcastInput::RIGHT); + argument::add_next_scalar_vector(task, std::vector(lhs_keys.begin(), lhs_keys.end())); + argument::add_next_scalar_vector(task, std::vector(rhs_keys.begin(), rhs_keys.end())); + argument::add_next_scalar(task, static_cast>(join_type)); + argument::add_next_scalar( + task, static_cast>(compare_nulls)); + argument::add_next_scalar_vector( + task, std::vector(lhs_out_columns.begin(), lhs_out_columns.end())); + argument::add_next_scalar_vector( + task, std::vector(rhs_out_columns.begin(), rhs_out_columns.end())); + argument::add_next_output(task, ret); + if (broadcast == BroadcastInput::AUTO) { + task.add_communicator("nccl"); + } else if (join_type == JoinType::FULL || + (broadcast == BroadcastInput::LEFT && join_type != JoinType::INNER)) { + throw std::runtime_error( + "Force broadcast was indicated, but repartitioning is required. " + "FULL joins do not support broadcasting and LEFT joins only for the " + "right hand side argument."); + } + runtime->submit(std::move(task)); } - runtime->submit(std::move(task)); return ret; } @@ -368,7 +436,8 @@ LogicalTable join(const LogicalTable& lhs, const std::vector& lhs_out_columns, const std::vector& rhs_out_columns, cudf::null_equality compare_nulls, - BroadcastInput broadcast) + BroadcastInput broadcast, + int _num_partitions) { // Convert column names to indices std::set lhs_keys_idx; @@ -397,7 +466,8 @@ LogicalTable join(const LogicalTable& lhs, lhs_out_columns_idx, rhs_out_columns_idx, compare_nulls, - broadcast); + broadcast, + _num_partitions); } LogicalTable join(const LogicalTable& lhs, @@ -431,6 +501,7 @@ namespace { void __attribute__((constructor)) register_tasks() { legate::dataframe::task::JoinTask::register_variants(); + legate::dataframe::task::JoinLocalTask::register_variants(); } } // namespace diff --git a/cpp/src/sort.cpp b/cpp/src/sort.cpp index cfab650..390e2bf 100644 --- a/cpp/src/sort.cpp +++ b/cpp/src/sort.cpp @@ -37,7 +37,7 @@ #include #include -#define DEBUG_SPLITS 0 +#define DEBUG_SPLITS 1 #if DEBUG_SPLITS #include #include diff --git a/python/benchmarks/join.py b/python/benchmarks/join.py index 2755d2f..a59120e 100644 --- a/python/benchmarks/join.py +++ b/python/benchmarks/join.py @@ -25,7 +25,8 @@ def create_key_and_data(args, module): - if "dask" in args.api: + module.random.seed(0) + if True: # "dask" in args.api: TODO, was wondering if this helps legate split HashPartition, but problems seems a different... # Dask doesn't support argsort and random.permutation is very slow, # instead we use an (very good) approximation of --unique-factor. key = module.random.random_integers(low=0, high=2**50, size=args.nrows) @@ -106,6 +107,13 @@ def create_table(args, name: str) -> LogicalTable: def f() -> Tuple[float, int]: lhs = create_table(args, name="lhs") rhs = create_table(args, name="rhs") + + print("Sorting lhs, just to proof a point:") + from legate_dataframe.lib.sort import sort + sort(lhs, ["lhs-key"]) + blocking_timing() + print("Sort done!") + t0 = blocking_timing() res = join( lhs, @@ -114,6 +122,7 @@ def f() -> Tuple[float, int]: rhs_keys=["rhs-key"], join_type=JoinType.INNER, compare_nulls=null_equality.EQUAL, + _num_paritions=args.legate_parts, ) t1 = blocking_timing() return t1 - t0, res.num_rows() @@ -155,6 +164,7 @@ def main(args): with API[args.api](args) as f: for i in range(args.nwarms): elapsed, num_out_rows = f() + print(" num out:", num_out_rows) nbytes = num_out_rows * ncols * itemsize + nbytes_input # Total size print( f"elapsed[warm #{i}]: {elapsed:.4f}s ({nbytes/elapsed/2**30:.3f} GiB/s)" @@ -247,5 +257,11 @@ def main(args): default=None, help="The initial RMM pool size e.g. 1GiB (default: disabled).", ) + parser.add_argument( + "--legate-parts", + default=-1, + type=int, + help="Use legate partitioning approach (and how many partitions).", + ) args = parser.parse_args() main(args) diff --git a/python/legate_dataframe/lib/join.pyx b/python/legate_dataframe/lib/join.pyx index 602868e..51f6e01 100644 --- a/python/legate_dataframe/lib/join.pyx +++ b/python/legate_dataframe/lib/join.pyx @@ -6,6 +6,7 @@ from libc.stdint cimport int32_t +from libcpp cimport bool as cpp_bool from libcpp.set cimport set as cpp_set from libcpp.string cimport string from libcpp.vector cimport vector @@ -49,7 +50,8 @@ cdef extern from "" nogil: const vector[string]& lhs_out_columns, const vector[string]& rhs_out_columns, null_equality compare_nulls, - BroadcastInput broadcast + BroadcastInput broadcast, + int _num_paritions ) except + @@ -64,7 +66,8 @@ def join( lhs_out_columns: Optional[Iterable[str]] = None, rhs_out_columns: Optional[Iterable[str]] = None, null_equality compare_nulls = null_equality.EQUAL, - BroadcastInput broadcast = BroadcastInput.AUTO + BroadcastInput broadcast = BroadcastInput.AUTO, + int _num_paritions = -1, ): """Perform an join between the specified tables. @@ -100,6 +103,10 @@ def join( to all workers (i.e. copied fully). This can be much faster, as it avoids more complex all-to-all communication. Defaults to ``AUTO`` which may do this based on the data size. + _num_paritions : int, default -1 + TODO(seberg): For testing only. With -1 (default), uses the NCCL approach. + Otherwise, uses a legate partitioning approach. + *Has no effect for a broadcast join.* Returns ------- @@ -141,6 +148,7 @@ def join( lhs_out_columns_vector, rhs_out_columns_vector, compare_nulls, - broadcast + broadcast, + _num_paritions, ) ) From 2fcbc247a757712cadd76c861b04234501604943 Mon Sep 17 00:00:00 2001 From: Sebastian Berg Date: Thu, 13 Feb 2025 15:20:21 +0100 Subject: [PATCH 2/3] Trying around with 1, 2. etc. to see what works Signed-off-by: Sebastian Berg --- cpp/include/legate_dataframe/core/table.hpp | 2 +- cpp/src/core/table.cpp | 8 +++-- cpp/src/hashpartition.cpp | 3 ++ cpp/src/join.cpp | 34 ++++++++++++--------- python/benchmarks/join.py | 13 ++++---- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/cpp/include/legate_dataframe/core/table.hpp b/cpp/include/legate_dataframe/core/table.hpp index 9c44127..b543427 100644 --- a/cpp/include/legate_dataframe/core/table.hpp +++ b/cpp/include/legate_dataframe/core/table.hpp @@ -459,7 +459,7 @@ std::vector add_next_input(legate::AutoTask& task, * @param constraints A LogicalArray belonging to tbl as returned by `hashpartition`. * If passed, these are added as imaging constraints for the table. */ -std::vector add_next_input(legate::AutoTask& task, +legate::Variable add_next_input(legate::AutoTask& task, const LogicalTable& tbl, const legate::LogicalArray& constraints); diff --git a/cpp/src/core/table.cpp b/cpp/src/core/table.cpp index 37e2810..9e55a9e 100644 --- a/cpp/src/core/table.cpp +++ b/cpp/src/core/table.cpp @@ -157,7 +157,8 @@ std::vector add_next_input(legate::AutoTask& task, return ret; } -std::vector add_next_input(legate::AutoTask& task, + +legate::Variable add_next_input(legate::AutoTask& task, const LogicalTable& tbl, const legate::LogicalArray& constraints) { @@ -170,7 +171,7 @@ std::vector add_next_input(legate::AutoTask& task, } auto constraints_var = task.add_input(constraints); // Require a column-wise partition of the constraints (hist) - task.add_constraint(legate::broadcast(constraints_var, {0})); + task.add_constraint(legate::broadcast(constraints_var, {1})); //for (auto var : ret) { // std::cout << " adding imaging constraints" << std::endl; // task.add_constraint(legate::image(constraints_var, var)); @@ -179,9 +180,10 @@ std::vector add_next_input(legate::AutoTask& task, // TODO(seberg): If it works, it feels nice if we would add this here and // just add image constraints for the first row. add_alignment_constraints(task, ret); - return ret; + return constraints_var; } + std::vector add_next_output(legate::AutoTask& task, const LogicalTable& tbl) { std::vector ret; diff --git a/cpp/src/hashpartition.cpp b/cpp/src/hashpartition.cpp index 4f6c395..7c379cc 100644 --- a/cpp/src/hashpartition.cpp +++ b/cpp/src/hashpartition.cpp @@ -132,6 +132,9 @@ hashpartition(const LogicalTable& table, const std::set& keys, int num_p runtime->submit(std::move(task)); // TODO: No good of course, fetches volume and num_parts == -1 is possible... partitions = partitions.delinearize(0, {partitions.volume() / num_parts, num_parts}); + partitions = partitions.transpose({1, 0}); // Tranpose, so that all chunks are on the same thing even! + std::cout << "hmmmm, shape: " << partitions.shape().at(0) << ", " << partitions.shape().at(1) << std::endl; + //partitions = partitions.project(0, 0); return std::make_pair(output, partitions); } diff --git a/cpp/src/join.cpp b/cpp/src/join.cpp index 535a8a7..c05c1ea 100644 --- a/cpp/src/join.cpp +++ b/cpp/src/join.cpp @@ -268,7 +268,7 @@ class JoinLocalTask : public Task { { GPUTaskContext ctx{context}; const auto lhs = argument::get_next_input(ctx); - //const auto rhs = argument::get_next_input(ctx); + const auto rhs = argument::get_next_input(ctx); const auto lhs_keys = argument::get_next_scalar_vector(ctx); const auto rhs_keys = argument::get_next_scalar_vector(ctx); auto join_type = argument::get_next_scalar(ctx); @@ -278,20 +278,22 @@ class JoinLocalTask : public Task { auto output = argument::get_next_output(ctx); std::ostringstream info; - info << "local join @" << ctx.rank << " nrows1: " << lhs.table_view().num_rows() << std::endl; - // info << ", nrows2: " << rhs.table_view().num_rows() << std::endl; + //info << "local join @" << ctx.rank << " nrows1: " << lhs.shape<1>().volume() << std::endl; + //info << " nrows2: " << rhs.shape<1>().volume() << std::endl; + info << "local join @" << ctx.rank << " nrows1: " << lhs.table_view().num_rows(); + info << ", nrows2: " << rhs.table_view().num_rows() << std::endl; std::cout << info.str() << std::endl; - //cudf_join_and_gather(ctx, - // lhs.table_view(), - // rhs.table_view(), - // lhs_keys, - // rhs_keys, - // join_type, - // null_equality, - // lhs_out_cols, - // rhs_out_cols, - // output); + cudf_join_and_gather(ctx, + lhs.table_view(), + rhs.table_view(), + lhs_keys, + rhs_keys, + join_type, + null_equality, + lhs_out_cols, + rhs_out_cols, + output); } }; @@ -380,8 +382,10 @@ LogicalTable join(const LogicalTable& lhs, legate::AutoTask task = runtime->create_task(get_library(), task::JoinLocalTask::TASK_ID); - argument::add_next_input(task, lhs, lhs_constraints); - // argument::add_next_input(task, rhs, lhs_constraints); + auto lhs_constr_var = argument::add_next_input(task, lhs_part, lhs_constraints); + auto rhs_constr_var = argument::add_next_input(task, rhs_part, rhs_constraints); + task.add_constraint(legate::align(lhs_constr_var, rhs_constr_var)); + argument::add_next_scalar_vector(task, std::vector(lhs_keys.begin(), lhs_keys.end())); argument::add_next_scalar_vector(task, std::vector(rhs_keys.begin(), rhs_keys.end())); argument::add_next_scalar(task, static_cast>(join_type)); diff --git a/python/benchmarks/join.py b/python/benchmarks/join.py index a59120e..29b2021 100644 --- a/python/benchmarks/join.py +++ b/python/benchmarks/join.py @@ -25,8 +25,7 @@ def create_key_and_data(args, module): - module.random.seed(0) - if True: # "dask" in args.api: TODO, was wondering if this helps legate split HashPartition, but problems seems a different... + if "dask" in args.api: # Dask doesn't support argsort and random.permutation is very slow, # instead we use an (very good) approximation of --unique-factor. key = module.random.random_integers(low=0, high=2**50, size=args.nrows) @@ -108,11 +107,11 @@ def f() -> Tuple[float, int]: lhs = create_table(args, name="lhs") rhs = create_table(args, name="rhs") - print("Sorting lhs, just to proof a point:") - from legate_dataframe.lib.sort import sort - sort(lhs, ["lhs-key"]) - blocking_timing() - print("Sort done!") + # print("Sorting lhs, just to proof a point:") + # from legate_dataframe.lib.sort import sort + # sort(lhs, ["lhs-key"]) + # blocking_timing() + # print("Sort done!") t0 = blocking_timing() res = join( From 0af9acd0d9d710fa2d891277f52e7fed75043579 Mon Sep 17 00:00:00 2001 From: Sebastian Berg Date: Mon, 17 Feb 2025 12:36:35 +0100 Subject: [PATCH 3/3] Experiment with bound arrays/stores for partitions... Signed-off-by: Sebastian Berg --- cpp/include/legate_dataframe/core/column.hpp | 12 +++++ cpp/include/legate_dataframe/core/library.hpp | 1 + cpp/include/legate_dataframe/core/table.hpp | 17 ++++++ cpp/src/core/column.cu | 1 + cpp/src/core/table.cpp | 14 ++--- cpp/src/hashpartition.cpp | 52 ++++++++++++++++--- cpp/src/join.cpp | 34 ++++++------ 7 files changed, 102 insertions(+), 29 deletions(-) diff --git a/cpp/include/legate_dataframe/core/column.hpp b/cpp/include/legate_dataframe/core/column.hpp index 40e96b5..9d3e475 100644 --- a/cpp/include/legate_dataframe/core/column.hpp +++ b/cpp/include/legate_dataframe/core/column.hpp @@ -78,6 +78,18 @@ class LogicalColumn { LogicalColumn(cudf::column_view cudf_col, rmm::cuda_stream_view stream = cudf::get_default_stream()); + LogicalColumn copy_as_bound() const + { + auto runtime = legate::Runtime::get_runtime(); + auto new_col = LogicalColumn( + runtime->create_array(array_->shape(), array_->type(), array_->nullable()), + cudf_type() + ); + auto new_col_store = new_col.array_->data(); + runtime->issue_copy(new_col_store, array_->data()); + return new_col; + } + /** * @brief Create a new unbounded column from an existing column * diff --git a/cpp/include/legate_dataframe/core/library.hpp b/cpp/include/legate_dataframe/core/library.hpp index c00678a..0aa3b2c 100644 --- a/cpp/include/legate_dataframe/core/library.hpp +++ b/cpp/include/legate_dataframe/core/library.hpp @@ -35,6 +35,7 @@ enum : int { BinaryOpColScalar, BinaryOpScalarCol, HashPartition, + HashPartitionCopy, Join, JoinLocal, ToTimestamps, diff --git a/cpp/include/legate_dataframe/core/table.hpp b/cpp/include/legate_dataframe/core/table.hpp index b543427..8a811ee 100644 --- a/cpp/include/legate_dataframe/core/table.hpp +++ b/cpp/include/legate_dataframe/core/table.hpp @@ -82,6 +82,23 @@ class LogicalTable { const std::vector& column_names, rmm::cuda_stream_view stream = cudf::get_default_stream()); + /** + * @brief Create a new table with new, copied, columns. + * + * TODO: At least for now just a hack to eplorer legate limitations... + * + * The result storage will be bound arrays. + */ + LogicalTable copy_as_bound() const + { + std::vector columns; + columns.reserve(columns_.size()); + for (const auto& col : columns_) { + columns.emplace_back(col.copy_as_bound()); + } + return LogicalTable(std::move(columns), get_column_names()); + } + /** * @brief Create a new unbounded table from an existing table * diff --git a/cpp/src/core/column.cu b/cpp/src/core/column.cu index 8aab7cb..a325ec2 100644 --- a/cpp/src/core/column.cu +++ b/cpp/src/core/column.cu @@ -430,6 +430,7 @@ namespace argument { legate::Variable add_next_input(legate::AutoTask& task, const LogicalColumn& col, bool broadcast) { add_next_scalar(task, static_cast>(col.cudf_type().id())); + std::cout << " adding columns that is unbound:" << col.unbound() << std::endl; add_next_scalar(task, col.unbound() ? -1 : static_cast(col.num_rows())); auto arr = col.get_logical_array(); auto variable = task.add_input(arr); diff --git a/cpp/src/core/table.cpp b/cpp/src/core/table.cpp index 9e55a9e..290570f 100644 --- a/cpp/src/core/table.cpp +++ b/cpp/src/core/table.cpp @@ -171,15 +171,15 @@ legate::Variable add_next_input(legate::AutoTask& task, } auto constraints_var = task.add_input(constraints); // Require a column-wise partition of the constraints (hist) - task.add_constraint(legate::broadcast(constraints_var, {1})); - //for (auto var : ret) { - // std::cout << " adding imaging constraints" << std::endl; - // task.add_constraint(legate::image(constraints_var, var)); - //} - task.add_constraint(legate::image(constraints_var, ret.at(0))); + task.add_constraint(legate::broadcast(constraints_var, {0})); + for (auto var : ret) { + std::cout << " adding imaging constraints that was unbound:" << constraints.unbound() << std::endl; + task.add_constraint(legate::image(constraints_var, var)); + } + //task.add_constraint(legate::image(constraints_var, ret.at(0))); // TODO(seberg): If it works, it feels nice if we would add this here and // just add image constraints for the first row. - add_alignment_constraints(task, ret); + //add_alignment_constraints(task, ret); return constraints_var; } diff --git a/cpp/src/hashpartition.cpp b/cpp/src/hashpartition.cpp index 7c379cc..456d2e7 100644 --- a/cpp/src/hashpartition.cpp +++ b/cpp/src/hashpartition.cpp @@ -35,6 +35,30 @@ namespace legate::dataframe { namespace task { +// This is a mini helper task to copy the paritions into a bound store +// maybe there is a way to do this, but `issue_copy()` refuses to do this +// on my store as of early 25.03.00.dev. +class HashPartitionCopyTask : public Task { + public: + static void cpu_variant(legate::TaskContext context) + { + auto in = context.input(0); + auto out = context.output(0); + + auto in_acc = in.data().read_accessor, 2>(); + auto out_acc = out.data().write_accessor, 2>(); + + auto shape = in.shape<2>(); + + for (size_t i = shape.lo[0]; i <= shape.hi[0]; i++) { + for (size_t j = shape.lo[1]; j <= shape.hi[1]; j++) { + out_acc[{i, j}] = in_acc[{i, j}]; + } + } + } +}; + + class HashPartitionTask : public Task { public: static void gpu_variant(legate::TaskContext context) @@ -105,6 +129,8 @@ hashpartition(const LogicalTable& table, const std::set& keys, int num_p throw std::invalid_argument("num_parts must be >=1 or -1 to indicate same as input."); } + auto runtime = legate::Runtime::get_runtime(); + LogicalTable output = LogicalTable::empty_like(table); // The result of this task are "partitions" described in a nranks x num_parts @@ -112,11 +138,8 @@ hashpartition(const LogicalTable& table, const std::set& keys, int num_p // the number of parts here. // TODO: is an unbound store really needed. // TODO: Legate has problems with mixed ndims, so this is 1-D and reshaped later. - legate::LogicalArray partitions( - legate::Runtime::get_runtime()->create_array(legate::rect_type(1), 1) - ); + legate::LogicalArray partitions{runtime->create_array(legate::rect_type(1), 1)}; - auto runtime = legate::Runtime::get_runtime(); legate::AutoTask task = runtime->create_task(get_library(), task::HashPartitionTask::TASK_ID); argument::add_next_scalar(task, num_parts); @@ -132,10 +155,26 @@ hashpartition(const LogicalTable& table, const std::set& keys, int num_p runtime->submit(std::move(task)); // TODO: No good of course, fetches volume and num_parts == -1 is possible... partitions = partitions.delinearize(0, {partitions.volume() / num_parts, num_parts}); - partitions = partitions.transpose({1, 0}); // Tranpose, so that all chunks are on the same thing even! + //partitions = partitions.transpose({1, 0}); // Tranpose, so that all chunks are on the same thing even! std::cout << "hmmmm, shape: " << partitions.shape().at(0) << ", " << partitions.shape().at(1) << std::endl; //partitions = partitions.project(0, 0); - return std::make_pair(output, partitions); + + // TODO: unbound paritions are making problems maybe (currently), so copy to a bound one: + auto part_copy = runtime->create_array( + {partitions.shape().at(0), partitions.shape().at(1)}, legate::rect_type(1) + ); + + task = + runtime->create_task(get_library(), task::HashPartitionCopyTask::TASK_ID); + auto part_copy_var = task.add_output(part_copy); + partitions_var = task.add_input(partitions); + + task.add_constraint(legate::align(part_copy_var, partitions_var)); + task.add_constraint(legate::broadcast(partitions_var, {0, 1})); + + runtime->submit(std::move(task)); + + return std::make_pair(output, part_copy); } @@ -158,6 +197,7 @@ namespace { void __attribute__((constructor)) register_tasks() { legate::dataframe::task::HashPartitionTask::register_variants(); + legate::dataframe::task::HashPartitionCopyTask::register_variants(); } } // namespace diff --git a/cpp/src/join.cpp b/cpp/src/join.cpp index c05c1ea..ecdb0f7 100644 --- a/cpp/src/join.cpp +++ b/cpp/src/join.cpp @@ -268,32 +268,32 @@ class JoinLocalTask : public Task { { GPUTaskContext ctx{context}; const auto lhs = argument::get_next_input(ctx); - const auto rhs = argument::get_next_input(ctx); + //const auto rhs = argument::get_next_input(ctx); const auto lhs_keys = argument::get_next_scalar_vector(ctx); const auto rhs_keys = argument::get_next_scalar_vector(ctx); auto join_type = argument::get_next_scalar(ctx); auto null_equality = argument::get_next_scalar(ctx); const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); - auto output = argument::get_next_output(ctx); + //auto output = argument::get_next_output(ctx); std::ostringstream info; //info << "local join @" << ctx.rank << " nrows1: " << lhs.shape<1>().volume() << std::endl; //info << " nrows2: " << rhs.shape<1>().volume() << std::endl; info << "local join @" << ctx.rank << " nrows1: " << lhs.table_view().num_rows(); - info << ", nrows2: " << rhs.table_view().num_rows() << std::endl; + //info << ", nrows2: " << rhs.table_view().num_rows() << std::endl; std::cout << info.str() << std::endl; - cudf_join_and_gather(ctx, - lhs.table_view(), - rhs.table_view(), - lhs_keys, - rhs_keys, - join_type, - null_equality, - lhs_out_cols, - rhs_out_cols, - output); + //cudf_join_and_gather(ctx, + // lhs.table_view(), + // rhs.table_view(), + // lhs_keys, + // rhs_keys, + // join_type, + // null_equality, + // lhs_out_cols, + // rhs_out_cols, + // output); } }; @@ -382,9 +382,11 @@ LogicalTable join(const LogicalTable& lhs, legate::AutoTask task = runtime->create_task(get_library(), task::JoinLocalTask::TASK_ID); + lhs_part = lhs_part.copy_as_bound(); + rhs_part = rhs_part.copy_as_bound(); auto lhs_constr_var = argument::add_next_input(task, lhs_part, lhs_constraints); - auto rhs_constr_var = argument::add_next_input(task, rhs_part, rhs_constraints); - task.add_constraint(legate::align(lhs_constr_var, rhs_constr_var)); + //auto rhs_constr_var = argument::add_next_input(task, rhs_part, rhs_constraints); + //task.add_constraint(legate::align(lhs_constr_var, rhs_constr_var)); argument::add_next_scalar_vector(task, std::vector(lhs_keys.begin(), lhs_keys.end())); argument::add_next_scalar_vector(task, std::vector(rhs_keys.begin(), rhs_keys.end())); @@ -395,7 +397,7 @@ LogicalTable join(const LogicalTable& lhs, task, std::vector(lhs_out_columns.begin(), lhs_out_columns.end())); argument::add_next_scalar_vector( task, std::vector(rhs_out_columns.begin(), rhs_out_columns.end())); - argument::add_next_output(task, ret); + //argument::add_next_output(task, ret); runtime->submit(std::move(task)); }