From f4496297c319a6c4d43bd8100f6c7d90947faad6 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Tue, 13 Jan 2026 15:04:06 +0800 Subject: [PATCH] ver1 --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table_metadata.cc | 10 +- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/set_snapshot_test.cc | 236 ++++++++++++++++++++++++++ src/iceberg/transaction.cc | 8 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.h | 1 + src/iceberg/update/set_snapshot.cc | 146 ++++++++++++++++ src/iceberg/update/set_snapshot.h | 97 +++++++++++ 10 files changed, 497 insertions(+), 5 deletions(-) create mode 100644 src/iceberg/test/set_snapshot_test.cc create mode 100644 src/iceberg/update/set_snapshot.cc create mode 100644 src/iceberg/update/set_snapshot.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 23d2e4cdf..0b3ffdad2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -83,6 +83,7 @@ set(ICEBERG_SOURCES type.cc update/pending_update.cc update/snapshot_update.cc + update/set_snapshot.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index ead2ef2c8..0176890cf 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -103,6 +103,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/pending_update.cc', + 'update/set_snapshot.cc', 'update/snapshot_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 22eb739bd..723920ff2 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -665,6 +665,9 @@ class TableMetadataBuilder::Impl { Result> UpdateSnapshotLog( int64_t current_snapshot_id) const; + /// \brief Internal method to set a branch snapshot + /// \param snapshot The snapshot to set + /// \param branch The branch name Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch); private: @@ -1087,11 +1090,8 @@ Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, // change is a noop return {}; } - - auto snapshot_it = snapshots_by_id_.find(snapshot_id); - ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), - "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); - return SetBranchSnapshotInternal(*snapshot_it->second, branch); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata_.SnapshotById(snapshot_id)); + return SetBranchSnapshotInternal(*snapshot, branch); } Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr snapshot, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 6124b6bcf..509a32cc0 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -169,6 +169,7 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(table_update_test USE_BUNDLE SOURCES + set_snapshot_test.cc transaction_test.cc update_partition_spec_test.cc update_properties_test.cc diff --git a/src/iceberg/test/set_snapshot_test.cc b/src/iceberg/test/set_snapshot_test.cc new file mode 100644 index 000000000..75a8fe2c0 --- /dev/null +++ b/src/iceberg/test/set_snapshot_test.cc @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" + +namespace iceberg { + +// Test fixture for SetSnapshot tests +class SetSnapshotTest : public UpdateTestBase { + protected: + // Snapshot IDs from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotId = 3051729675574597004; + static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; + + // Timestamps from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770; + static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770; +}; + +TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set current snapshot to the older snapshot + set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set current snapshot to the current snapshot (no-op) + set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to set to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->SetCurrentSnapshot(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); +} + +TEST_F(SetSnapshotTest, RollbackToValid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to the oldest snapshot (which is an ancestor) + set_snapshot->RollbackTo(kOldestSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to rollback to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->RollbackTo(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to a time between the two snapshots + // This should select the oldest snapshot + int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2; + set_snapshot->RollbackToTime(time_between); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to rollback to a time before any snapshot + int64_t time_before_all = kOldestSnapshotTimestamp - 1000000; + set_snapshot->RollbackToTime(time_before_all); + + // Should fail - no snapshot older than the specified time + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to a timestamp just after the oldest snapshot + // This should return the oldest snapshot (the latest one before this time) + int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1; + set_snapshot->RollbackToTime(time_just_after_oldest); + + // Apply and verify - should return the oldest snapshot + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, ApplyWithoutChanges) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Apply without making any changes (NOOP) + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + + // Should return current snapshot + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, MethodChaining) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Test that methods return reference for chaining + // Note: Only the last operation should take effect + auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + EXPECT_EQ(&result1, set_snapshot.get()); +} + +TEST_F(SetSnapshotTest, CommitSuccess) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set to oldest snapshot + set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + + // Commit the change + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + + // Commit the transaction + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + + // Verify the current snapshot was changed + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, CommitEmptyUpdate) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Commit without making any changes (NOOP) + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + + // Commit the transaction + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + + // Verify the current snapshot remained the same + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Verify the kind is correct + EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6ef942dbb..51d6d4fab 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -24,6 +24,7 @@ #include "iceberg/catalog.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -31,6 +32,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" @@ -163,6 +165,12 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->AssignUUID(); } } break; + case PendingUpdate::Kind::kSetSnapshot: { + auto& set_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply()); + metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id, + std::string(SnapshotRef::kMainBranch)); + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..4ce209d76 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'pending_update.h', + 'set_snapshot.h', 'snapshot_update.h', 'update_partition_spec.h', 'update_schema.h', diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 2124d7e12..fb81a868e 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -47,6 +47,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, + kSetSnapshot, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc new file mode 100644 index 000000000..f1a173872 --- /dev/null +++ b/src/iceberg/update/set_snapshot.cc @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include +#include +#include + +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +Result> SetSnapshot::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create SetSnapshot without a transaction"); + return std::shared_ptr(new SetSnapshot(std::move(transaction))); +} + +SetSnapshot::SetSnapshot(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +SetSnapshot::~SetSnapshot() = default; + +SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) { + const TableMetadata& base_metadata = transaction_->current(); + + // Validate that the snapshot exists + auto snapshot_result = base_metadata.SnapshotById(snapshot_id); + ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + + target_snapshot_id_ = snapshot_id; + + return *this; +} + +SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) { + // Find the latest snapshot by timestamp older than timestamp_ms + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot_opt, + FindLatestAncestorOlderThan(timestamp_ms)); + + ICEBERG_BUILDER_CHECK(snapshot_opt.has_value(), + "Cannot roll back, no valid snapshot older than: {}", + timestamp_ms); + + target_snapshot_id_ = snapshot_opt.value()->snapshot_id; + is_rollback_ = true; + + return *this; +} + +SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { + const TableMetadata& current = transaction_->current(); + + // Validate that the snapshot exists + auto snapshot_result = current.SnapshotById(snapshot_id); + ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + + // Validate that the snapshot is an ancestor of the current state + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + bool is_ancestor, SnapshotUtil::IsAncestorOf(*transaction_->table(), snapshot_id)); + ICEBERG_BUILDER_CHECK( + is_ancestor, + "Cannot roll back to snapshot, not an ancestor of the current state: {}", + snapshot_id); + + return SetCurrentSnapshot(snapshot_id); +} + +Result> SetSnapshot::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + const TableMetadata& base_metadata = transaction_->current(); + + // If no target snapshot was configured, return current state (NOOP) + if (!target_snapshot_id_.has_value()) { + return base_metadata.Snapshot(); + } + + // If this is a rollback, validate that the target is still an ancestor + if (is_rollback_) { + ICEBERG_ASSIGN_OR_RAISE( + bool is_ancestor, + SnapshotUtil::IsAncestorOf(*transaction_->table(), target_snapshot_id_.value())); + ICEBERG_CHECK(is_ancestor, + "Cannot roll back to {}: not an ancestor of the current table state", + target_snapshot_id_.value()); + } + + return base_metadata.SnapshotById(target_snapshot_id_.value()); +} + +Result>> SetSnapshot::FindLatestAncestorOlderThan( + int64_t timestamp_ms) const { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, + SnapshotUtil::CurrentAncestors(*transaction_->table())); + + int64_t snapshot_timestamp = 0; + std::shared_ptr result = nullptr; + + for (const auto& snapshot : ancestors) { + if (snapshot == nullptr) { + continue; + } + + int64_t snap_timestamp_ms = UnixMsFromTimePointMs(snapshot->timestamp_ms); + + if (snap_timestamp_ms < timestamp_ms && snap_timestamp_ms > snapshot_timestamp) { + result = snapshot; + snapshot_timestamp = snap_timestamp_ms; + } + } + + if (result == nullptr) { + return std::nullopt; + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h new file mode 100644 index 000000000..b2ac41fec --- /dev/null +++ b/src/iceberg/update/set_snapshot.h @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/set_snapshot.h +/// \brief Sets the current snapshot directly or by rolling back. + +namespace iceberg { + +/// \brief Sets the current snapshot directly or by rolling back. +/// +/// This update is not exposed through the Table API. Instead, it is a package-private +/// part of the Transaction API intended for use in ManageSnapshots. +/// +/// When committing, these changes will be applied to the current table metadata. +/// Commit conflicts will not be resolved and will result in a CommitFailed error. +class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~SetSnapshot() override; + + /// \brief Sets the table's current state to a specific Snapshot identified by id. + /// + /// This method allows setting the current snapshot to any valid snapshot defined + /// in the table metadata, regardless of its relationship to the current state. + /// + /// \param snapshot_id The snapshot ID to set as current + /// \return Reference to this for method chaining + SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Rolls back the table's state to the last Snapshot before the given timestamp. + /// + /// This method traverses the history of the current snapshot to find the most recent + /// ancestor that happened before the specified time. + /// + /// \param timestamp_ms A timestamp in milliseconds since the Unix epoch + /// \return Reference to this for method chaining + SetSnapshot& RollbackToTime(int64_t timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + /// + /// This method strictly validates that the target snapshot is an ancestor of the + /// current table state. + /// + /// \param snapshot_id The snapshot ID to roll back to. Must be an ancestor of the + /// current snapshot + /// \return Reference to this for method chaining + SetSnapshot& RollbackTo(int64_t snapshot_id); + + Kind kind() const final { return Kind::kSetSnapshot; } + + /// \brief Apply the pending changes and return the target snapshot. + Result> Apply(); + + private: + explicit SetSnapshot(std::shared_ptr transaction); + + /// \brief Find the latest snapshot whose timestamp is before the provided timestamp. + /// + /// \param timestamp_ms Lookup snapshots before this timestamp + /// \return The snapshot that was current at the given timestamp, or nullopt + Result>> FindLatestAncestorOlderThan( + int64_t timestamp_ms) const; + + std::optional target_snapshot_id_; + bool is_rollback_{false}; +}; + +} // namespace iceberg