Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
type.cc
update/pending_update.cc
update/snapshot_update.cc
update/set_snapshot.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
'update/set_snapshot.cc',
'update/snapshot_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,9 @@ class TableMetadataBuilder::Impl {
Result<std::vector<SnapshotLogEntry>> UpdateSnapshotLog(
int64_t current_snapshot_id) const;

/// \brief Internal method to set a branch snapshot
/// \param snapshot The snapshot to set
/// \param branch The branch name
Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch);

private:
Expand Down Expand Up @@ -1087,11 +1090,8 @@ Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
// change is a noop
return {};
}

auto snapshot_it = snapshots_by_id_.find(snapshot_id);
ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
"Cannot set {} to unknown snapshot: {}", branch, snapshot_id);
return SetBranchSnapshotInternal(*snapshot_it->second, branch);
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata_.SnapshotById(snapshot_id));
return SetBranchSnapshotInternal(*snapshot, branch);
}

Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot,
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(table_update_test
USE_BUNDLE
SOURCES
set_snapshot_test.cc
transaction_test.cc
update_partition_spec_test.cc
update_properties_test.cc
Expand Down
236 changes: 236 additions & 0 deletions src/iceberg/test/set_snapshot_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/set_snapshot.h"

#include <memory>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/transaction.h"

namespace iceberg {

// Test fixture for SetSnapshot tests
class SetSnapshotTest : public UpdateTestBase {
protected:
// Snapshot IDs from TableMetadataV2Valid.json
static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;

// Timestamps from TableMetadataV2Valid.json
static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
};

TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Set current snapshot to the older snapshot
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);

// Apply and verify
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
}

TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Set current snapshot to the current snapshot (no-op)
set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);

// Apply and verify
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
}

TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Try to set to a non-existent snapshot
int64_t invalid_snapshot_id = 9999999999999999;
set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);

// Should fail during Apply
auto result = set_snapshot->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
}

TEST_F(SetSnapshotTest, RollbackToValid) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Rollback to the oldest snapshot (which is an ancestor)
set_snapshot->RollbackTo(kOldestSnapshotId);

// Apply and verify
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
}

TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Try to rollback to a non-existent snapshot
int64_t invalid_snapshot_id = 9999999999999999;
set_snapshot->RollbackTo(invalid_snapshot_id);

// Should fail during Apply
auto result = set_snapshot->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
}

TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Rollback to a time between the two snapshots
// This should select the oldest snapshot
int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2;
set_snapshot->RollbackToTime(time_between);

// Apply and verify
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
}

TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Try to rollback to a time before any snapshot
int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
set_snapshot->RollbackToTime(time_before_all);

// Should fail - no snapshot older than the specified time
auto result = set_snapshot->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
}

TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Rollback to a timestamp just after the oldest snapshot
// This should return the oldest snapshot (the latest one before this time)
int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
set_snapshot->RollbackToTime(time_just_after_oldest);

// Apply and verify - should return the oldest snapshot
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
}

TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Apply without making any changes (NOOP)
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());

// Should return current snapshot
EXPECT_NE(result, nullptr);
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
}

TEST_F(SetSnapshotTest, MethodChaining) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Test that methods return reference for chaining
// Note: Only the last operation should take effect
auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
EXPECT_EQ(&result1, set_snapshot.get());
}

TEST_F(SetSnapshotTest, CommitSuccess) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Set to oldest snapshot
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);

// Commit the change
EXPECT_THAT(set_snapshot->Commit(), IsOk());

// Commit the transaction
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());

// Verify the current snapshot was changed
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
}

TEST_F(SetSnapshotTest, CommitEmptyUpdate) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Commit without making any changes (NOOP)
EXPECT_THAT(set_snapshot->Commit(), IsOk());

// Commit the transaction
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());

// Verify the current snapshot remained the same
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
}

TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) {
// Create transaction and SetSnapshot
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));

// Verify the kind is correct
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
}

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

#include "iceberg/catalog.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/set_snapshot.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
Expand Down Expand Up @@ -163,6 +165,12 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
case PendingUpdate::Kind::kSetSnapshot: {
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply());
metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id,
std::string(SnapshotRef::kMainBranch));
} break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
install_headers(
[
'pending_update.h',
'set_snapshot.h',
'snapshot_update.h',
'update_partition_spec.h',
'update_schema.h',
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdateSchema,
kUpdateSnapshot,
kUpdateSortOrder,
kSetSnapshot,
};

/// \brief Return the kind of this pending update.
Expand Down
Loading
Loading