Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/include/legate_dataframe/core/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class LogicalColumn {
LogicalColumn(cudf::column_view cudf_col,
rmm::cuda_stream_view stream = cudf::get_default_stream());

LogicalColumn copy_as_bound() const
{
auto runtime = legate::Runtime::get_runtime();
auto new_col = LogicalColumn(
runtime->create_array(array_->shape(), array_->type(), array_->nullable()),
cudf_type()
);
auto new_col_store = new_col.array_->data();
runtime->issue_copy(new_col_store, array_->data());
return new_col;
}

/**
* @brief Create a new unbounded column from an existing column
*
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/legate_dataframe/core/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ enum : int {
BinaryOpColCol,
BinaryOpColScalar,
BinaryOpScalarCol,
HashPartition,
HashPartitionCopy,
Join,
JoinLocal,
ToTimestamps,
ExtractTimestampComponent,
Sequence,
Expand Down
55 changes: 55 additions & 0 deletions cpp/include/legate_dataframe/core/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ class LogicalTable {
const std::vector<std::string>& column_names,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Create a new table with new, copied, columns.
*
* TODO: At least for now just a hack to eplorer legate limitations...
*
* The result storage will be bound arrays.
*/
LogicalTable copy_as_bound() const
{
std::vector<LogicalColumn> columns;
columns.reserve(columns_.size());
for (const auto& col : columns_) {
columns.emplace_back(col.copy_as_bound());
}
return LogicalTable(std::move(columns), get_column_names());
}

/**
* @brief Create a new unbounded table from an existing table
*
Expand Down Expand Up @@ -332,6 +349,16 @@ class PhysicalTable {
*/
[[nodiscard]] int32_t num_columns() const { return columns_.size(); }

/**
* @brief Finds the global row offset of the table.
*
* @throws std::out_of_range if there is no column in the table.
* @throws std::runtime_error if the first column is unbound (assumes others are not)
*
* @return The row offset in number of rows (inclusive).
*/
[[nodiscard]] int64_t global_row_offset() { return columns_.at(0).global_row_offset(); }

/**
* @brief Return a cudf table view of this physical table
*
Expand Down Expand Up @@ -434,6 +461,25 @@ std::vector<legate::Variable> add_next_input(legate::AutoTask& task,
const LogicalTable& tbl,
bool broadcast = false);


/**
* @brief Add a logical table to the next input task argument
*
* This adds alignment constraints to all logical columns within the table.
* This should match a call to `get_next_input<PhysicalTable>()` by a legate task.
*
* NB: the order of "add_next_*" calls must match the order of the
* corresponding "get_next_*" calls.
*
* @param task The legate task to add the argument.
* @param tbl The logical table to add as the next task argument.
* @param constraints A LogicalArray belonging to tbl as returned by `hashpartition`.
* If passed, these are added as imaging constraints for the table.
*/
legate::Variable add_next_input(legate::AutoTask& task,
const LogicalTable& tbl,
const legate::LogicalArray& constraints);

/**
* @brief Add a logical table to the next output task argument
*
Expand All @@ -452,11 +498,20 @@ template <>
inline task::PhysicalTable get_next_input<task::PhysicalTable>(GPUTaskContext& ctx)
{
auto num_columns = get_next_scalar<int32_t>(ctx);
bool get_constraints = false;
if (num_columns < 0) {
num_columns = -num_columns;
get_constraints = true;
}
std::vector<task::PhysicalColumn> cols;
cols.reserve(num_columns);
for (auto i = 0; i < num_columns; ++i) {
cols.push_back(argument::get_next_input<task::PhysicalColumn>(ctx));
}
if (get_constraints) {
// Imaging constraints were added. We don't need them, but must skip it.
ctx.get_next_input_arg();
}
return task::PhysicalTable(std::move(cols));
}

Expand Down
56 changes: 56 additions & 0 deletions cpp/include/legate_dataframe/hashpartition.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <vector>

#include <legate_dataframe/core/table.hpp>

namespace legate::dataframe {

/**
* @brief Hash partition a table based on some columns.
*
* This returns a new table and partitioning information
*
* TODO: Should maybe just attach that to the table. Need a convenient way
* (or at least an example) for applying the partitioning constraints.
*
* If we attach some user unique "partition" object (maybe?) then the
* user could drag that around.
* I.e. a partition is described by the column (names) a second step working
* on a partitioned table will always have to use those identical names in some way
* (e.g. for a local join). (in fact, maybe to the point that remembering the
* hashes might be useful often?)
*
* @param tbl The table to hash partition.
* @param keys The column names to partition by.
* @param num_parts Number of partitions or -1 in which case the current number
* of ranks will be used. (TODO: To be seen what makes sense here.)
* @return A new LogicalTable with and a LogicalArray describing its partitioning.
*/
std::pair<LogicalTable, legate::LogicalArray>
hashpartition(const LogicalTable& tbl, const std::set<std::string>& keys, int num_parts = -1);


// TODO: Do we really duplicate docs for this?
std::pair<LogicalTable, legate::LogicalArray>
hashpartition(const LogicalTable& tbl, const std::set<size_t>& keys, int num_parts = -1);


} // namespace legate::dataframe
12 changes: 10 additions & 2 deletions cpp/include/legate_dataframe/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ enum class BroadcastInput : int32_t { AUTO = 0, LEFT, RIGHT };
* @param rhs_out_columns Indices of the right hand table columns to include in the result.
* @param compare_nulls Controls whether null join-key values should match or not
* @param broadcast Which, if any, of the inputs should be copied to all workers.
* @param _num_partitions TODO: For testing.
* The number of partitions to use. If -1 uses the nccl approach instead. If
* broadcast is not AUTO, this is ignored (no shuffling is needed).
* @return The joining result
*/
LogicalTable join(const LogicalTable& lhs,
Expand All @@ -55,7 +58,8 @@ LogicalTable join(const LogicalTable& lhs,
const std::vector<size_t>& lhs_out_columns,
const std::vector<size_t>& rhs_out_columns,
cudf::null_equality compare_nulls = cudf::null_equality::EQUAL,
BroadcastInput broadcast = BroadcastInput::AUTO);
BroadcastInput broadcast = BroadcastInput::AUTO,
int _num_partitions = -1);

/**
* @brief Perform a join between the specified tables.
Expand All @@ -75,6 +79,9 @@ LogicalTable join(const LogicalTable& lhs,
* @param rhs_out_columns Names of the right hand table columns to include in the result.
* @param compare_nulls Controls whether null join-key values should match or not
* @param broadcast Which, if any, of the inputs should be copied to all workers.
* @param _num_partitions TODO: For testing.
* The number of partitions to use. If -1 uses the nccl approach instead. If
* broadcast is not AUTO, this is ignored (no shuffling is needed).
* @return The joining result
*/
LogicalTable join(const LogicalTable& lhs,
Expand All @@ -85,7 +92,8 @@ LogicalTable join(const LogicalTable& lhs,
const std::vector<std::string>& lhs_out_columns,
const std::vector<std::string>& rhs_out_columns,
cudf::null_equality compare_nulls = cudf::null_equality::EQUAL,
BroadcastInput broadcast = BroadcastInput::AUTO);
BroadcastInput broadcast = BroadcastInput::AUTO,
int _num_partitions = -1);

/**
* @brief Perform a join between the specified tables.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/core/column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ namespace argument {
legate::Variable add_next_input(legate::AutoTask& task, const LogicalColumn& col, bool broadcast)
{
add_next_scalar(task, static_cast<std::underlying_type_t<cudf::type_id>>(col.cudf_type().id()));
std::cout << " adding columns that is unbound:" << col.unbound() << std::endl;
add_next_scalar(task, col.unbound() ? -1 : static_cast<int64_t>(col.num_rows()));
auto arr = col.get_logical_array();
auto variable = task.add_input(arr);
Expand Down
27 changes: 25 additions & 2 deletions cpp/src/core/library.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <legate_dataframe/core/allocator.hpp>
#include <legate_dataframe/core/library.hpp>

#include <iostream>

namespace legate::dataframe {
namespace task {

Expand Down Expand Up @@ -48,6 +50,7 @@ class Mapper : public legate::mapping::Mapper {
const std::vector<legate::mapping::StoreTarget>& options) override
{
using legate::mapping::StoreMapping;
const auto task_id = static_cast<int>(task.task_id());
std::vector<StoreMapping> mappings;

// For now, we set "exact" policy for all Stores
Expand All @@ -58,8 +61,22 @@ class Mapper : public legate::mapping::Mapper {
mappings.push_back(
StoreMapping::default_mapping(store, options.front(), /*exact = */ true));
}
// TODO: This is also needed for strings!
mappings.back().policy().ordering.set_c_order();
}
bool first = true;
for (const legate::mapping::Array& ary : task.outputs()) {
if (first && task_id == legate::dataframe::task::OpCode::HashPartition) {
/* The first output of HashPartition is mapped to the CZMEM */
// TODO: We could probably just use sysmem here, but there seems to be
// a small issue (as of early 25.03.00.dev) and we should then also set it
// for consuming tasks (since we never actually read it there).
mappings.push_back(
StoreMapping::default_mapping(ary.stores().at(0), legate::mapping::StoreTarget::ZCMEM, /*exact */ true));
first = false;
continue;
}
first = false;
if (ary.type().variable_size()) { continue; }
for (const legate::mapping::Store& store : ary.stores()) {
mappings.push_back(
Expand Down Expand Up @@ -105,10 +122,16 @@ class Mapper : public legate::mapping::Mapper {

return size_exchange_nbytes + metadata_nbytes;
}
case legate::dataframe::task::OpCode::HashPartition: {
// TODO: This could probably just use sysmem here
auto nrank = task.get_launch_domain().get_volume();
auto num_parts = task.scalars().at(0).value<int>();
num_parts = num_parts < 0 ? nrank : num_parts;
return nrank * num_parts * sizeof(legate::Rect<1>);
}
default: return 0;
}
}

// TODO: Returning nullopt prevents other parallel task launches so it would be
// good to provide estimated usage for most tasks here.
return std::nullopt;
Expand All @@ -128,7 +151,7 @@ legate::Library create_and_registrate_library()
}
// Set with_has_allocations globally since currently all tasks allocate (and libcudf may also)
// Also ensure we can generally work with 1000+ non-string return columns.
auto options = legate::VariantOptions{}.with_has_allocations(true).with_return_size(32768);
auto options = legate::VariantOptions{}.with_has_allocations(true);
auto context =
legate::Runtime::get_runtime()->find_or_create_library(library_name,
legate::ResourceConfig{},
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/core/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include <legate_dataframe/core/table.hpp>

#include <iostream>

namespace legate::dataframe {

LogicalTable::LogicalTable(std::vector<LogicalColumn> columns,
Expand Down Expand Up @@ -139,6 +141,7 @@ bool PhysicalTable::is_broadcasted() const
} // namespace task

namespace argument {

std::vector<legate::Variable> add_next_input(legate::AutoTask& task,
const LogicalTable& tbl,
bool broadcast)
Expand All @@ -154,6 +157,33 @@ std::vector<legate::Variable> add_next_input(legate::AutoTask& task,
return ret;
}


legate::Variable add_next_input(legate::AutoTask& task,
const LogicalTable& tbl,
const legate::LogicalArray& constraints)
{
std::vector<legate::Variable> ret;
// First we add number of columns, use negative to signal constraints
add_next_scalar(task, -tbl.num_columns());
// Then we add each column
for (const auto& col : tbl.get_columns()) {
ret.push_back(add_next_input(task, col, /* broadcast */ false));
}
auto constraints_var = task.add_input(constraints);
// Require a column-wise partition of the constraints (hist)
task.add_constraint(legate::broadcast(constraints_var, {0}));
for (auto var : ret) {
std::cout << " adding imaging constraints that was unbound:" << constraints.unbound() << std::endl;
task.add_constraint(legate::image(constraints_var, var));
}
//task.add_constraint(legate::image(constraints_var, ret.at(0)));
// TODO(seberg): If it works, it feels nice if we would add this here and
// just add image constraints for the first row.
//add_alignment_constraints(task, ret);
return constraints_var;
}


std::vector<legate::Variable> add_next_output(legate::AutoTask& task, const LogicalTable& tbl)
{
std::vector<legate::Variable> ret;
Expand Down
Loading
Loading