Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
update/update_location.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ iceberg_sources = files(
'type.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
return transaction->NewUpdateSchema();
}

Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

/// \brief Create a new UpdateLocation to update the table location and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_partition_spec_test.cc
update_properties_test.cc
update_schema_test.cc
update_sort_order_test.cc)
update_sort_order_test.cc
update_location_test.cc)

add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)

Expand Down
148 changes: 148 additions & 0 deletions src/iceberg/test/update_location_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_location.h"

#include <memory>
#include <string>

#include <arrow/filesystem/mockfs.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/result.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"

namespace iceberg {

class UpdateLocationTest : public UpdateTestBase {};

TEST_F(UpdateLocationTest, SetLocationSuccess) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
const std::string new_location = "/warehouse/new_location";
update->SetLocation(new_location);

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result, new_location);
}

TEST_F(UpdateLocationTest, SetLocationMultipleTimes) {
// Test that setting location multiple times uses the last value
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
update->SetLocation("/warehouse/first_location")
.SetLocation("/warehouse/second_location")
.SetLocation("/warehouse/final_location");

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result, "/warehouse/final_location");
}

TEST_F(UpdateLocationTest, SetEmptyLocation) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
update->SetLocation("");

auto result = update->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("Location cannot be empty"));
}

TEST_F(UpdateLocationTest, ApplyWithoutSettingLocation) {
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());

auto result = update->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(result, HasErrorMessage("Location must be set before applying"));
}

TEST_F(UpdateLocationTest, CommitSuccess) {
// Test empty commit (should fail since location is not set)
ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateLocation());
auto empty_commit_result = empty_update->Commit();
EXPECT_THAT(empty_commit_result, IsError(ErrorKind::kInvalidArgument));

// Test commit with location change
// For MockFS, we need to create the metadata directory at the new location
const std::string new_location = "/warehouse/new_table_location";
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
ASSERT_TRUE(arrow_fs != nullptr);
ASSERT_TRUE(arrow_fs->CreateDir(new_location + "/metadata").ok());

ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
update->SetLocation(new_location);
EXPECT_THAT(update->Commit(), IsOk());

// Verify the location was committed
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
EXPECT_EQ(reloaded->location(), new_location);
}

TEST_F(UpdateLocationTest, CommitWithRelativePath) {
// Test that relative paths work
const std::string relative_location = "warehouse/relative_location";

// Create metadata directory for the new location
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
ASSERT_TRUE(arrow_fs != nullptr);
ASSERT_TRUE(arrow_fs->CreateDir(relative_location + "/metadata").ok());

ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
update->SetLocation(relative_location);

EXPECT_THAT(update->Commit(), IsOk());

ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
EXPECT_EQ(reloaded->location(), relative_location);
}

TEST_F(UpdateLocationTest, MultipleUpdatesSequentially) {
// Get arrow_fs for creating directories
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
ASSERT_TRUE(arrow_fs != nullptr);

// First update
const std::string first_location = "/warehouse/first";
ASSERT_TRUE(arrow_fs->CreateDir(first_location + "/metadata").ok());

ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateLocation());
update1->SetLocation(first_location);
EXPECT_THAT(update1->Commit(), IsOk());

// Reload and verify
ICEBERG_UNWRAP_OR_FAIL(auto reloaded1, catalog_->LoadTable(table_ident_));
EXPECT_EQ(reloaded1->location(), first_location);

// Second update
const std::string second_location = "/warehouse/second";
ASSERT_TRUE(arrow_fs->CreateDir(second_location + "/metadata").ok());

ICEBERG_UNWRAP_OR_FAIL(auto update2, reloaded1->NewUpdateLocation());
update2->SetLocation(second_location);
EXPECT_THAT(update2->Commit(), IsOk());

// Reload and verify
ICEBERG_UNWRAP_OR_FAIL(auto reloaded2, catalog_->LoadTable(table_ident_));
EXPECT_EQ(reloaded2->location(), second_location);
}

} // namespace iceberg
13 changes: 13 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/table_update.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
Expand Down Expand Up @@ -163,6 +164,11 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
case PendingUpdate::Kind::kUpdateLocation: {
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
metadata_builder_->SetLocation(location);
} break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
Expand Down Expand Up @@ -253,4 +259,11 @@ Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
return update_schema;
}

Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
UpdateLocation::Make(shared_from_this()));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
return update_location;
}

} // namespace iceberg
4 changes: 4 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

/// \brief Create a new UpdateLocation to update the table location and commit the
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;
class UpdateLocation;

/// ----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ install_headers(
'update_schema.h',
'update_sort_order.h',
'update_properties.h',
'update_location.h',
],
subdir: 'iceberg/update',
)
1 change: 1 addition & 0 deletions src/iceberg/update/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdateSchema,
kUpdateSnapshot,
kUpdateSortOrder,
kUpdateLocation,
};

/// \brief Return the kind of this pending update.
Expand Down
58 changes: 58 additions & 0 deletions src/iceberg/update/update_location.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_location.h"

#include <memory>
#include <string>
#include <string_view>

#include "iceberg/result.h"
#include "iceberg/transaction.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<std::shared_ptr<UpdateLocation>> UpdateLocation::Make(
std::shared_ptr<Transaction> transaction) {
ICEBERG_PRECHECK(transaction != nullptr,
"Cannot create UpdateLocation without a transaction");
return std::shared_ptr<UpdateLocation>(new UpdateLocation(std::move(transaction)));
}

UpdateLocation::UpdateLocation(std::shared_ptr<Transaction> transaction)
: PendingUpdate(std::move(transaction)) {}

UpdateLocation::~UpdateLocation() = default;

UpdateLocation& UpdateLocation::SetLocation(std::string_view location) {
ICEBERG_BUILDER_CHECK(!location.empty(), "Location cannot be empty");
location_ = std::string(location);
return *this;
}

Result<std::string> UpdateLocation::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
if (location_.empty()) {
return InvalidArgument("Location must be set before applying");
}
return location_;
}

} // namespace iceberg
58 changes: 58 additions & 0 deletions src/iceberg/update/update_location.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <string_view>

#include "iceberg/type_fwd.h"
#include "iceberg/update/pending_update.h"

/// \file iceberg/update/update_location.h
/// \brief Updates the table location.

namespace iceberg {

/// \brief Updating table location with a new base location.
class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateLocation>> Make(
std::shared_ptr<Transaction> transaction);

~UpdateLocation() override;

/// \brief Sets the new location for the table.
///
/// \param location The new table location
/// \return Reference to this for method chaining
UpdateLocation& SetLocation(std::string_view location);

Kind kind() const final { return Kind::kUpdateLocation; }

/// \brief Apply the pending changes and return the new location.
Result<std::string> Apply();

private:
explicit UpdateLocation(std::shared_ptr<Transaction> transaction);

std::string location_;
};

} // namespace iceberg
Loading