-
Notifications
You must be signed in to change notification settings - Fork 619
Schema - Layout decoupling #1327
Changes from all commits
5d93f4e
e909f82
48bfd04
2697ea6
81db08d
f4525b5
bc3834f
0d7dfde
36cdad0
5714572
f9130af
422cf95
0500c9e
cdb9b47
1e00029
2df5b2c
04843e7
2f53ab5
ed3db38
2daeb37
8162337
5410348
8f8bf0e
faedaa1
fe1e6b4
6b36654
1965813
9489f5a
7408c20
4aaee91
429c9d0
f9187cc
5e62206
6ce9fc8
d7ce6cd
c219d7b
2ea5d7e
f256082
0ff874d
ecb95bb
6630805
cc8751a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
#include "catalog/index_catalog.h" | ||
#include "catalog/index_metrics_catalog.h" | ||
#include "catalog/language_catalog.h" | ||
#include "catalog/layout_catalog.h" | ||
#include "catalog/proc_catalog.h" | ||
#include "catalog/query_history_catalog.h" | ||
#include "catalog/query_metrics_catalog.h" | ||
|
@@ -161,7 +162,7 @@ void Catalog::BootstrapSystemCatalogs(storage::Database *database, | |
system_catalogs->GetSchemaCatalog()->InsertSchema( | ||
CATALOG_SCHEMA_OID, CATALOG_SCHEMA_NAME, pool_.get(), txn); | ||
system_catalogs->GetSchemaCatalog()->InsertSchema( | ||
DEFUALT_SCHEMA_OID, DEFUALT_SCHEMA_NAME, pool_.get(), txn); | ||
DEFAULT_SCHEMA_OID, DEFAULT_SCHEMA_NAME, pool_.get(), txn); | ||
|
||
// Insert catalog tables into pg_table | ||
// pg_database record is shared across different databases | ||
|
@@ -180,6 +181,9 @@ void Catalog::BootstrapSystemCatalogs(storage::Database *database, | |
system_catalogs->GetTableCatalog()->InsertTable( | ||
COLUMN_CATALOG_OID, COLUMN_CATALOG_NAME, CATALOG_SCHEMA_NAME, | ||
database_oid, pool_.get(), txn); | ||
system_catalogs->GetTableCatalog()->InsertTable( | ||
LAYOUT_CATALOG_OID, LAYOUT_CATALOG_NAME, CATALOG_SCHEMA_NAME, | ||
database_oid, pool_.get(), txn); | ||
} | ||
|
||
void Catalog::Bootstrap() { | ||
|
@@ -295,7 +299,8 @@ ResultType Catalog::CreateTable(const std::string &database_name, | |
const std::string &table_name, | ||
std::unique_ptr<catalog::Schema> schema, | ||
concurrency::TransactionContext *txn, | ||
bool is_catalog, oid_t tuples_per_tilegroup) { | ||
bool is_catalog, uint32_t tuples_per_tilegroup, | ||
peloton::LayoutType layout_type) { | ||
if (txn == nullptr) | ||
throw CatalogException("Do not have transaction to create table " + | ||
table_name); | ||
|
@@ -348,7 +353,8 @@ ResultType Catalog::CreateTable(const std::string &database_name, | |
bool adapt_table = false; | ||
auto table = storage::TableFactory::GetDataTable( | ||
database_object->GetDatabaseOid(), table_oid, schema.release(), | ||
table_name, tuples_per_tilegroup, own_schema, adapt_table, is_catalog); | ||
table_name, tuples_per_tilegroup, own_schema, adapt_table, is_catalog, | ||
layout_type); | ||
database->AddTable(table, is_catalog); | ||
// put data table object into rw_object_set | ||
txn->RecordCreate(database_object->GetDatabaseOid(), table_oid, INVALID_OID); | ||
|
@@ -555,6 +561,44 @@ ResultType Catalog::CreateIndex( | |
return ResultType::SUCCESS; | ||
} | ||
|
||
std::shared_ptr<const storage::Layout> Catalog::CreateLayout( | ||
oid_t database_oid, oid_t table_oid, const column_map_type &column_map, | ||
concurrency::TransactionContext *txn) { | ||
auto storage_manager = storage::StorageManager::GetInstance(); | ||
auto database = storage_manager->GetDatabaseWithOid(database_oid); | ||
auto table = database->GetTableWithOid(table_oid); | ||
|
||
oid_t layout_oid = table->GetNextLayoutOid(); | ||
// Ensure that the new layout | ||
PELOTON_ASSERT(layout_oid < INVALID_OID); | ||
auto new_layout = std::shared_ptr<const storage::Layout>( | ||
new const storage::Layout(column_map, layout_oid)); | ||
|
||
// Add the layout the pg_layout table | ||
auto pg_layout = catalog_map_[database_oid]->GetLayoutCatalog(); | ||
bool result = | ||
pg_layout->InsertLayout(table_oid, new_layout, pool_.get(), txn); | ||
if (!result) { | ||
LOG_ERROR("Failed to create a new layout for table %u", table_oid); | ||
return nullptr; | ||
} | ||
return new_layout; | ||
} | ||
|
||
std::shared_ptr<const storage::Layout> Catalog::CreateDefaultLayout( | ||
oid_t database_oid, oid_t table_oid, const column_map_type &column_map, | ||
concurrency::TransactionContext *txn) { | ||
auto new_layout = CreateLayout(database_oid, table_oid, column_map, txn); | ||
// If the layout creation was successful, set it as the default | ||
if (new_layout != nullptr) { | ||
auto storage_manager = storage::StorageManager::GetInstance(); | ||
auto database = storage_manager->GetDatabaseWithOid(database_oid); | ||
auto table = database->GetTableWithOid(table_oid); | ||
table->SetDefaultLayout(new_layout); | ||
} | ||
return new_layout; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just wondered what is the difference between function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And from my understanding, only HYBRID layout type will be persist in pg_layout catalog table, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the current transformation infrastructure, the layouts are changed in two contexts:
Yes, we only need to persist |
||
//===----------------------------------------------------------------------===// | ||
// DROP FUNCTIONS | ||
//===----------------------------------------------------------------------===// | ||
|
@@ -700,7 +744,6 @@ ResultType Catalog::DropTable(oid_t database_oid, oid_t table_oid, | |
auto table_object = database_object->GetTableObject(table_oid); | ||
auto index_objects = table_object->GetIndexObjects(); | ||
LOG_TRACE("dropping #%d indexes", (int)index_objects.size()); | ||
|
||
// delete trigger and records in pg_trigger | ||
auto pg_trigger = | ||
catalog_map_[database_object->GetDatabaseOid()]->GetTriggerCatalog(); | ||
|
@@ -719,11 +762,15 @@ ResultType Catalog::DropTable(oid_t database_oid, oid_t table_oid, | |
catalog_map_[database_object->GetDatabaseOid()]->GetColumnCatalog(); | ||
pg_attribute->DeleteColumns(table_oid, txn); | ||
|
||
// delete record in pg_layout | ||
auto pg_layout = | ||
catalog_map_[database_object->GetDatabaseOid()]->GetLayoutCatalog(); | ||
pg_layout->DeleteLayouts(table_oid, txn); | ||
|
||
// delete record in pg_table | ||
auto pg_table = | ||
catalog_map_[database_object->GetDatabaseOid()]->GetTableCatalog(); | ||
pg_table->DeleteTable(table_oid, txn); | ||
|
||
database->GetTableWithOid(table_oid); | ||
txn->RecordDrop(database_oid, table_oid, INVALID_OID); | ||
|
||
|
@@ -764,6 +811,30 @@ ResultType Catalog::DropIndex(oid_t database_oid, oid_t index_oid, | |
return ResultType::SUCCESS; | ||
} | ||
|
||
ResultType Catalog::DropLayout(oid_t database_oid, oid_t table_oid, | ||
oid_t layout_oid, | ||
concurrency::TransactionContext *txn) { | ||
// Check if the default_layout of the table is the same. | ||
// If true reset it to a row store. | ||
auto storage_manager = storage::StorageManager::GetInstance(); | ||
auto database = storage_manager->GetDatabaseWithOid(database_oid); | ||
auto table = database->GetTableWithOid(table_oid); | ||
auto default_layout = table->GetDefaultLayout(); | ||
|
||
if (default_layout.GetOid() == layout_oid) { | ||
table->ResetDefaultLayout(); | ||
} | ||
|
||
auto pg_layout = catalog_map_[database_oid]->GetLayoutCatalog(); | ||
if (!pg_layout->DeleteLayout(table_oid, layout_oid, txn)) { | ||
auto layout = table->GetDefaultLayout(); | ||
LOG_DEBUG("Layout delete failed. Default layout id: %u", layout.GetOid()); | ||
return ResultType::FAILURE; | ||
} | ||
|
||
return ResultType::SUCCESS; | ||
} | ||
|
||
//===--------------------------------------------------------------------===// | ||
// GET WITH NAME - CHECK FROM CATALOG TABLES, USING TRANSACTION | ||
//===--------------------------------------------------------------------===// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// Peloton | ||
// | ||
// layout_catalog.cpp | ||
// | ||
// Identification: src/catalog/layout_catalog.cpp | ||
// | ||
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
#include "catalog/layout_catalog.h" | ||
|
||
#include "catalog/catalog.h" | ||
#include "catalog/system_catalogs.h" | ||
#include "concurrency/transaction_context.h" | ||
#include "executor/logical_tile.h" | ||
#include "storage/data_table.h" | ||
#include "storage/layout.h" | ||
|
||
namespace peloton { | ||
namespace catalog { | ||
|
||
/** @brief Constructor invoked by the SystemsCatalog constructor. | ||
* @param pg_catalog The database to which this pg_layout belongs. | ||
*/ | ||
LayoutCatalog::LayoutCatalog( | ||
storage::Database *pg_catalog, UNUSED_ATTRIBUTE type::AbstractPool *pool, | ||
UNUSED_ATTRIBUTE concurrency::TransactionContext *txn) | ||
: AbstractCatalog(LAYOUT_CATALOG_OID, LAYOUT_CATALOG_NAME, | ||
InitializeSchema().release(), pg_catalog) { | ||
// Add indexes for pg_attribute | ||
AddIndex({ColumnId::TABLE_OID, ColumnId::LAYOUT_OID}, LAYOUT_CATALOG_PKEY_OID, | ||
LAYOUT_CATALOG_NAME "_pkey", IndexConstraintType::PRIMARY_KEY); | ||
AddIndex({ColumnId::TABLE_OID}, LAYOUT_CATALOG_SKEY0_OID, | ||
LAYOUT_CATALOG_NAME "_skey0", IndexConstraintType::DEFAULT); | ||
} | ||
/** @brief Destructor. Do nothing. Layouts will be dropped by DropTable. */ | ||
LayoutCatalog::~LayoutCatalog() {} | ||
|
||
/** @brief Initilailizes the schema for the pg_layout table. | ||
* @return unique_ptr of the schema for pg_layout. | ||
*/ | ||
std::unique_ptr<catalog::Schema> LayoutCatalog::InitializeSchema() { | ||
const std::string primary_key_constraint_name = "primary_key"; | ||
const std::string not_null_constraint_name = "not_null"; | ||
|
||
auto table_id_column = catalog::Column( | ||
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER), | ||
"table_oid", true); | ||
table_id_column.AddConstraint(catalog::Constraint( | ||
ConstraintType::PRIMARY, primary_key_constraint_name)); | ||
table_id_column.AddConstraint( | ||
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name)); | ||
|
||
auto layout_oid_column = catalog::Column( | ||
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER), | ||
"layout_oid", true); | ||
layout_oid_column.AddConstraint(catalog::Constraint( | ||
ConstraintType::PRIMARY, primary_key_constraint_name)); | ||
layout_oid_column.AddConstraint( | ||
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name)); | ||
|
||
auto num_columns_column = catalog::Column( | ||
type::TypeId::INTEGER, type::Type::GetTypeSize(type::TypeId::INTEGER), | ||
"num_columns", true); | ||
num_columns_column.AddConstraint( | ||
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name)); | ||
|
||
auto column_map_column = catalog::Column( | ||
type::TypeId::VARCHAR, type::Type::GetTypeSize(type::TypeId::VARCHAR), | ||
"column_map", false); | ||
column_map_column.AddConstraint( | ||
catalog::Constraint(ConstraintType::NOTNULL, not_null_constraint_name)); | ||
|
||
std::unique_ptr<catalog::Schema> column_catalog_schema( | ||
new catalog::Schema({table_id_column, layout_oid_column, | ||
num_columns_column, column_map_column})); | ||
|
||
return column_catalog_schema; | ||
} | ||
|
||
/** @brief Insert a layout into the pg_layout table. | ||
* @param table_oid oid of the table to which the new layout belongs. | ||
* @param layout layout to be added to the pg_layout table. | ||
* @param pool to allocate memory for the column_map column. | ||
* @param txn TransactionContext for adding the layout. | ||
* @return true on success. | ||
*/ | ||
bool LayoutCatalog::InsertLayout(oid_t table_oid, | ||
std::shared_ptr<const storage::Layout> layout, | ||
type::AbstractPool *pool, | ||
concurrency::TransactionContext *txn) { | ||
// Create the tuple first | ||
std::unique_ptr<storage::Tuple> tuple( | ||
new storage::Tuple(catalog_table_->GetSchema(), true)); | ||
|
||
auto val0 = type::ValueFactory::GetIntegerValue(table_oid); | ||
auto val1 = type::ValueFactory::GetIntegerValue(layout->GetOid()); | ||
auto val2 = type::ValueFactory::GetIntegerValue(layout->GetColumnCount()); | ||
auto val3 = type::ValueFactory::GetVarcharValue(layout->SerializeColumnMap(), | ||
nullptr); | ||
|
||
tuple->SetValue(LayoutCatalog::ColumnId::TABLE_OID, val0, pool); | ||
tuple->SetValue(LayoutCatalog::ColumnId::LAYOUT_OID, val1, pool); | ||
tuple->SetValue(LayoutCatalog::ColumnId::NUM_COLUMNS, val2, pool); | ||
tuple->SetValue(LayoutCatalog::ColumnId::COLUMN_MAP, val3, pool); | ||
|
||
// Insert the tuple | ||
return InsertTuple(std::move(tuple), txn); | ||
} | ||
|
||
/** @brief Delete a layout from the pg_layout table. | ||
* @param table_oid oid of the table to which the old layout belongs. | ||
* @param layout_oid oid of the layout to be deleted. | ||
* @param txn TransactionContext for deleting the layout. | ||
* @return true on success. | ||
*/ | ||
bool LayoutCatalog::DeleteLayout(oid_t table_oid, oid_t layout_oid, | ||
concurrency::TransactionContext *txn) { | ||
oid_t index_offset = IndexId::PRIMARY_KEY; // Index of table_oid & layout_oid | ||
|
||
std::vector<type::Value> values; | ||
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy()); | ||
values.push_back(type::ValueFactory::GetIntegerValue(layout_oid).Copy()); | ||
|
||
auto pg_table = Catalog::GetInstance() | ||
->GetSystemCatalogs(database_oid) | ||
->GetTableCatalog(); | ||
|
||
// delete column from cache | ||
auto table_object = pg_table->GetTableObject(table_oid, txn); | ||
table_object->EvictLayout(layout_oid); | ||
|
||
return DeleteWithIndexScan(index_offset, values, txn); | ||
} | ||
|
||
/** @brief Delete all layouts correponding to a table from the pg_layout. | ||
* @param table_oid oid of the table to delete all layouts. | ||
* @param txn TransactionContext for deleting the layouts. | ||
* @return true on success. | ||
*/ | ||
bool LayoutCatalog::DeleteLayouts(oid_t table_oid, | ||
concurrency::TransactionContext *txn) { | ||
oid_t index_offset = IndexId::SKEY_TABLE_OID; // Index of table_oid | ||
std::vector<type::Value> values; | ||
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy()); | ||
|
||
// delete layouts from cache | ||
auto pg_table = Catalog::GetInstance() | ||
->GetSystemCatalogs(database_oid) | ||
->GetTableCatalog(); | ||
auto table_object = pg_table->GetTableObject(table_oid, txn); | ||
table_object->EvictAllLayouts(); | ||
|
||
return DeleteWithIndexScan(index_offset, values, txn); | ||
} | ||
|
||
/** @brief Get all layouts correponding to a table from the pg_layout. | ||
* @param table_oid oid of the table to fetch all layouts. | ||
* @param txn TransactionContext for getting the layouts. | ||
* @return unordered_map containing a layout_oid -> layout mapping. | ||
*/ | ||
const std::unordered_map<oid_t, std::shared_ptr<const storage::Layout>> | ||
LayoutCatalog::GetLayouts(oid_t table_oid, | ||
concurrency::TransactionContext *txn) { | ||
// Try to find the layouts in the cache | ||
auto pg_table = Catalog::GetInstance() | ||
->GetSystemCatalogs(database_oid) | ||
->GetTableCatalog(); | ||
auto table_object = pg_table->GetTableObject(table_oid, txn); | ||
PELOTON_ASSERT(table_object && table_object->GetTableOid() == table_oid); | ||
auto layout_objects = table_object->GetLayouts(true); | ||
if (layout_objects.size() != 0) { | ||
return layout_objects; | ||
} | ||
|
||
// Cache miss, get from pg_catalog | ||
std::vector<oid_t> column_ids(all_column_ids); | ||
oid_t index_offset = IndexId::SKEY_TABLE_OID; // Index of table_oid | ||
std::vector<type::Value> values; | ||
values.push_back(type::ValueFactory::GetIntegerValue(table_oid).Copy()); | ||
|
||
auto result_tiles = | ||
GetResultWithIndexScan(column_ids, index_offset, values, txn); | ||
|
||
for (auto &tile : (*result_tiles)) { // Iterate through the result_tiles | ||
for (auto tuple_id : *tile) { | ||
oid_t layout_oid = | ||
tile->GetValue(tuple_id, LayoutCatalog::ColumnId::LAYOUT_OID) | ||
.GetAs<oid_t>(); | ||
oid_t num_columns = | ||
tile->GetValue(tuple_id, LayoutCatalog::ColumnId::NUM_COLUMNS) | ||
.GetAs<oid_t>(); | ||
std::string column_map_str = | ||
tile->GetValue(tuple_id, LayoutCatalog::ColumnId::COLUMN_MAP) | ||
.ToString(); | ||
auto column_map = | ||
storage::Layout::DeserializeColumnMap(num_columns, column_map_str); | ||
auto layout_object = | ||
std::make_shared<const storage::Layout>(column_map, layout_oid); | ||
table_object->InsertLayout(layout_object); | ||
} | ||
} | ||
|
||
return table_object->GetLayouts(); | ||
} | ||
|
||
/** @brief Get the layout by layout_oid from the pg_layout. | ||
* @param table_oid oid of the table to fetch the layout. | ||
* @param layout_oid oid of the layout being queried. | ||
* @param txn TransactionContext for getting the layout. | ||
* @return shared_ptr corresponding to the layout_oid if found. | ||
* nullptr otherwise. | ||
*/ | ||
std::shared_ptr<const storage::Layout> LayoutCatalog::GetLayoutWithOid( | ||
oid_t table_oid, oid_t layout_oid, concurrency::TransactionContext *txn) { | ||
auto table_layouts = GetLayouts(table_oid, txn); | ||
for (const auto &layout_entry : table_layouts) { | ||
if (layout_entry.second->GetOid() == layout_oid) { | ||
return layout_entry.second; | ||
} | ||
} | ||
return nullptr; | ||
} | ||
|
||
} // namespace catalog | ||
} // namespace peloton |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, but this map seems to be accessed concurrently without a lock in
Catalog::BootstrapSystemCatalogs()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. There are other small bugs I noticed in #1356 and #1352. I will raise a Catalog cleanup PR once this gets merged in.