From e8bc801925f3592936bbe5545d4b0a4002f3455c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 16 Jan 2026 16:13:00 +0800 Subject: [PATCH 01/10] refresh token --- paimon-python/pypaimon/api/rest_util.py | 14 ++--- .../catalog/rest/rest_token_file_io.py | 17 ++++++- .../pypaimon/read/reader/lance_utils.py | 51 +++++++++++-------- .../pypaimon/tests/reader_base_test.py | 26 +++++++--- .../pypaimon/tests/rest/rest_base_test.py | 38 +------------- .../pypaimon/tests/rest/rest_server.py | 48 ++++++++++++++++- .../tests/rest/rest_token_file_io_test.py | 49 ++++++++++++++++-- .../pypaimon/write/file_store_commit.py | 2 +- 8 files changed, 164 insertions(+), 81 deletions(-) diff --git a/paimon-python/pypaimon/api/rest_util.py b/paimon-python/pypaimon/api/rest_util.py index 97a709ecc34c..33ed29dce710 100644 --- a/paimon-python/pypaimon/api/rest_util.py +++ b/paimon-python/pypaimon/api/rest_util.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from typing import Dict +from typing import Dict, Optional from urllib.parse import unquote from pypaimon.common.options import Options @@ -46,21 +46,21 @@ def extract_prefix_map( @staticmethod def merge( - base_properties: Dict[str, str], - override_properties: Dict[str, str]) -> Dict[str, str]: + base_properties: Optional[Dict[str, str]], + override_properties: Optional[Dict[str, str]]) -> Dict[str, str]: if override_properties is None: override_properties = {} if base_properties is None: base_properties = {} - + result = {} - + for key, value in base_properties.items(): if value is not None and key not in override_properties: result[key] = value - + for key, value in override_properties.items(): if value is not None: result[key] = value - + return result diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index f686dc66ea37..991a983bc402 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -18,19 +18,22 @@ import logging import threading import time +from pathlib import Path from typing import Optional from cachetools import TTLCache from pypaimon.api.rest_api import RESTApi -from pypaimon.api.rest_util import RESTUtil from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.common.file_io import FileIO from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.common.identifier import Identifier +<<<<<<< HEAD from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions from pypaimon.common.uri_reader import UriReaderFactory +======= +>>>>>>> 5b8c6e7f3 (refresh rest token) class RESTTokenFileIO(FileIO): @@ -42,7 +45,7 @@ class RESTTokenFileIO(FileIO): _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds def __init__(self, identifier: Identifier, path: str, - catalog_options: Optional[Options] = None): + catalog_options: Optional[dict] = None): self.identifier = identifier self.path = path self.catalog_options = catalog_options @@ -57,6 +60,7 @@ def __init__(self, identifier: Identifier, path: str, ttl=self._FILE_IO_CACHE_TTL ) +<<<<<<< HEAD def __getstate__(self): state = self.__dict__.copy() # Remove non-serializable objects @@ -178,6 +182,15 @@ def uri_reader_factory(self): @property def filesystem(self): return self.file_io().filesystem +======= + def _initialize_oss_fs(self, path) -> FileSystem: + self.try_to_refresh_token() + self.properties.update(self.token.token) + return super()._initialize_oss_fs(path) + + def new_output_stream(self, path: Path): + return self.filesystem.open_output_stream(str(path)) +>>>>>>> 5b8c6e7f3 (refresh rest token) def try_to_refresh_token(self): if self.should_refresh(): diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py b/paimon-python/pypaimon/read/reader/lance_utils.py index c219dc67043f..8a3015146791 100644 --- a/paimon-python/pypaimon/read/reader/lance_utils.py +++ b/paimon-python/pypaimon/read/reader/lance_utils.py @@ -26,9 +26,17 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[Dict[str, str]]]: """Convert path and extract storage options for Lance format.""" +<<<<<<< HEAD if hasattr(file_io, 'file_io'): file_io = file_io.file_io() +======= + if hasattr(file_io, 'get_merged_properties'): + properties = file_io.get_merged_properties() + else: + properties = file_io.properties if hasattr(file_io, 'properties') and file_io.properties else None + +>>>>>>> 5b8c6e7f3 (refresh rest token) scheme, _, _ = file_io.parse_location(file_path) storage_options = None file_path_for_lance = file_io.to_filesystem_path(file_path) @@ -40,37 +48,40 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[D file_path_for_lance = file_path if scheme == 'oss': - storage_options = {} - if hasattr(file_io, 'properties'): - for key, value in file_io.properties.data.items(): + parsed = urlparse(file_path) + bucket = parsed.netloc + path = parsed.path.lstrip('/') + + if properties: + storage_options = {} + for key, value in properties.to_map().items(): if str(key).startswith('fs.'): storage_options[key] = value - parsed = urlparse(file_path) - bucket = parsed.netloc - path = parsed.path.lstrip('/') - - endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT) + endpoint = properties.get(OssOptions.OSS_ENDPOINT) if endpoint: endpoint_clean = endpoint.replace('http://', '').replace('https://', '') storage_options['endpoint'] = f"https://{bucket}.{endpoint_clean}" - if file_io.properties.contains(OssOptions.OSS_ACCESS_KEY_ID): - storage_options['access_key_id'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_ID) - storage_options['oss_access_key_id'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_ID) - if file_io.properties.contains(OssOptions.OSS_ACCESS_KEY_SECRET): - storage_options['secret_access_key'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET) - storage_options['oss_secret_access_key'] = file_io.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET) - if file_io.properties.contains(OssOptions.OSS_SECURITY_TOKEN): - storage_options['session_token'] = file_io.properties.get(OssOptions.OSS_SECURITY_TOKEN) - storage_options['oss_session_token'] = file_io.properties.get(OssOptions.OSS_SECURITY_TOKEN) - if file_io.properties.contains(OssOptions.OSS_ENDPOINT): - storage_options['oss_endpoint'] = file_io.properties.get(OssOptions.OSS_ENDPOINT) + if properties.contains(OssOptions.OSS_ACCESS_KEY_ID): + storage_options['access_key_id'] = properties.get(OssOptions.OSS_ACCESS_KEY_ID) + storage_options['oss_access_key_id'] = properties.get(OssOptions.OSS_ACCESS_KEY_ID) + if properties.contains(OssOptions.OSS_ACCESS_KEY_SECRET): + storage_options['secret_access_key'] = properties.get(OssOptions.OSS_ACCESS_KEY_SECRET) + storage_options['oss_secret_access_key'] = properties.get(OssOptions.OSS_ACCESS_KEY_SECRET) + if properties.contains(OssOptions.OSS_SECURITY_TOKEN): + storage_options['session_token'] = properties.get(OssOptions.OSS_SECURITY_TOKEN) + storage_options['oss_session_token'] = properties.get(OssOptions.OSS_SECURITY_TOKEN) + if properties.contains(OssOptions.OSS_ENDPOINT): + storage_options['oss_endpoint'] = properties.get(OssOptions.OSS_ENDPOINT) + storage_options['virtual_hosted_style_request'] = 'true' - + if bucket and path: file_path_for_lance = f"oss://{bucket}/{path}" elif bucket: file_path_for_lance = f"oss://{bucket}" + else: + storage_options = None return file_path_for_lance, storage_options diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 3d9ed7f8743b..a69b9a4b48db 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1111,7 +1111,7 @@ def test_primary_key_value_stats(self): value_stats = file_meta.value_stats self.assertIsNotNone(value_stats, "value_stats should not be None") - + if file_meta.value_stats_cols is None: expected_value_fields = ['name', 'price', 'category'] self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), @@ -1119,12 +1119,12 @@ def test_primary_key_value_stats(self): else: self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") - + expected_value_fields = ['name', 'price', 'category'] self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), f"value_stats_cols should contain value fields: {expected_value_fields}, " f"but got: {file_meta.value_stats_cols}") - + expected_arity = len(file_meta.value_stats_cols) self.assertEqual(value_stats.min_values.arity, expected_arity, f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " @@ -1135,17 +1135,17 @@ def test_primary_key_value_stats(self): self.assertEqual(len(value_stats.null_counts), expected_arity, f"value_stats null_counts should have {expected_arity} elements, " f"but got {len(value_stats.null_counts)}") - + self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " f"value_stats_cols length ({len(file_meta.value_stats_cols)})") - + for field_name in file_meta.value_stats_cols: is_system_field = (field_name.startswith('_KEY_') or field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") - + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, table.fields @@ -1161,7 +1161,7 @@ def test_primary_key_value_stats(self): self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") - + actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") actual_ids = sorted(actual_data.column('id').to_pylist()) @@ -1365,7 +1365,11 @@ def test_read_batch_size_config(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() +<<<<<<< HEAD +======= + +>>>>>>> 5b8c6e7f3 (refresh rest token) if splits: # Use _create_split_read to create reader split_read = table_read._create_split_read(splits[0]) @@ -1373,7 +1377,11 @@ def test_read_batch_size_config(self): batch_count = 0 total_rows = 0 max_batch_size = 0 +<<<<<<< HEAD +======= + +>>>>>>> 5b8c6e7f3 (refresh rest token) try: while True: batch = reader.read_arrow_batch() @@ -1385,7 +1393,11 @@ def test_read_batch_size_config(self): max_batch_size = max(max_batch_size, batch_rows) finally: reader.close() +<<<<<<< HEAD +======= + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertGreater(batch_count, 1, f"With batch_size=10, should get multiple batches, got {batch_count}") self.assertEqual(total_rows, 50, "Should read all 50 rows") diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py b/paimon-python/pypaimon/tests/rest/rest_base_test.py index 2fc14eb1118d..e45fac5cde61 100644 --- a/paimon-python/pypaimon/tests/rest/rest_base_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py @@ -28,7 +28,7 @@ from pypaimon import CatalogFactory, Schema from pypaimon.api.api_response import ConfigResponse from pypaimon.api.auth import BearTokenAuthProvider -from pypaimon.common.options import Options +from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.catalog.rest.table_metadata import TableMetadata @@ -227,39 +227,3 @@ def _read_test_table(self, read_builder): table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() return table_read.to_arrow(splits) - - def _write_test_table_with_schema(self, table, pa_schema): - """Write test data using the specified PyArrow schema.""" - write_builder = table.new_batch_write_builder() - - # first write - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - data1 = { - 'user_id': [1, 2, 3, 4], - 'item_id': [1001, 1002, 1003, 1004], - 'behavior': ['a', 'b', 'c', None], - 'dt': ['p1', 'p1', 'p2', 'p1'], - 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', '2024-01-01'], - } - pa_table = pa.Table.from_pydict(data1, schema=pa_schema) - table_write.write_arrow(pa_table) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() - - # second write - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - data2 = { - 'user_id': [5, 6, 7, 8], - 'item_id': [1005, 1006, 1007, 1008], - 'behavior': ['e', 'f', 'g', 'h'], - 'dt': ['p2', 'p1', 'p2', 'p2'], - 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', '2025-08-08'], - } - pa_table = pa.Table.from_pydict(data2, schema=pa_schema) - table_write.write_arrow(pa_table) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index d556f7f55c99..cb7b32108352 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -24,9 +24,12 @@ from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING from urllib.parse import urlparse +if TYPE_CHECKING: + from pypaimon.catalog.rest.rest_token import RESTToken + from pypaimon.api.api_request import (AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest) from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse, @@ -213,6 +216,7 @@ def __init__(self, data_path: str, auth_provider, config: ConfigResponse, wareho self.table_partitions_store: Dict[str, List] = {} self.no_permission_databases: List[str] = [] self.no_permission_tables: List[str] = [] + self.table_token_store: Dict[str, "RESTToken"] = {} # Initialize mock catalog (simplified) self.data_path = data_path @@ -469,10 +473,12 @@ def _handle_table_resource(self, method: str, path_parts: List[str], # Basic table operations (GET, DELETE, etc.) return self._table_handle(method, data, lookup_identifier) elif len(path_parts) == 4: - # Extended operations (e.g., commit) + # Extended operations (e.g., commit, token) operation = path_parts[3] if operation == "commit": return self._table_commit_handle(method, data, lookup_identifier, branch_part) + elif operation == "token": + return self._table_token_handle(method, lookup_identifier) else: return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) @@ -574,6 +580,44 @@ def _table_handle(self, method: str, data: str, identifier: Identifier) -> Tuple return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + def _table_token_handle(self, method: str, identifier: Identifier) -> Tuple[str, int]: + if method != "GET": + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + + from pypaimon.api.api_response import GetTableTokenResponse + + token_key = identifier.get_full_name() + if token_key in self.table_token_store: + rest_token = self.table_token_store[token_key] + response = GetTableTokenResponse( + token=rest_token.token, + expires_at_millis=rest_token.expire_at_millis + ) + else: + default_token = { + "akId": "akId" + str(int(time.time() * 1000)), + "akSecret": "akSecret" + str(int(time.time() * 1000)) + } + response = GetTableTokenResponse( + token=default_token, + expires_at_millis=int(time.time() * 1000) + 3600_000 # 1 hour from now + ) + + return self._mock_response(response, 200) + + def set_table_token(self, identifier: Identifier, token: "RESTToken") -> None: + self.table_token_store[identifier.get_full_name()] = token + + def get_table_token(self, identifier: Identifier) -> Optional["RESTToken"]: + return self.table_token_store.get(identifier.get_full_name()) + + def reset_table_token(self, identifier: Identifier) -> None: + if identifier.get_full_name() in self.table_token_store: + del self.table_token_store[identifier.get_full_name()] + def _table_commit_handle(self, method: str, data: str, identifier: Identifier, branch: str = None) -> Tuple[str, int]: """Handle table commit operations""" diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index 47ea8e6cb626..f4c1eb7cedee 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -101,6 +101,7 @@ def test_try_to_write_atomic_directory_check(self): target_dir = os.path.join(self.temp_dir, "target_dir") os.makedirs(target_dir) +<<<<<<< HEAD result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") @@ -108,6 +109,15 @@ def test_try_to_write_atomic_directory_check(self): self.assertTrue(os.path.isdir(target_dir)) self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") +======= + + result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") + self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") + + self.assertTrue(os.path.isdir(target_dir)) + self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") + +>>>>>>> 5b8c6e7f3 (refresh rest token) normal_file = os.path.join(self.temp_dir, "normal_file.txt") result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") @@ -223,35 +233,41 @@ def test_catalog_options_not_modified(self): CatalogOptions.URI.key(): "http://test-uri", "custom.key": "custom.value" }) - + catalog_options_copy = Options(original_catalog_options.to_map()) - + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): file_io = RESTTokenFileIO( self.identifier, self.warehouse_path, original_catalog_options ) - + token_dict = { OssOptions.OSS_ACCESS_KEY_ID.key(): "token-access-key", OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key", OssOptions.OSS_ENDPOINT.key(): "token-endpoint" } +<<<<<<< HEAD merged_token = file_io._merge_token_with_catalog_options(token_dict) +======= + + merged_token = file_io._merge_token_with_catalog_options(token_dict) + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertEqual( original_catalog_options.to_map(), catalog_options_copy.to_map(), "Original catalog_options should not be modified" ) - + merged_properties = RESTUtil.merge( original_catalog_options.to_map(), merged_token ) - + self.assertIn("custom.key", merged_properties) self.assertEqual(merged_properties["custom.key"], "custom.value") self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), merged_properties) @@ -264,11 +280,19 @@ def test_filesystem_property(self): self.warehouse_path, self.catalog_options ) +<<<<<<< HEAD self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") filesystem = file_io.filesystem self.assertIsNotNone(filesystem, "filesystem should not be None") +======= + + self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") + filesystem = file_io.filesystem + self.assertIsNotNone(filesystem, "filesystem should not be None") + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(filesystem, 'open_input_file'), "filesystem should support open_input_file method") @@ -279,12 +303,20 @@ def test_uri_reader_factory_property(self): self.warehouse_path, self.catalog_options ) +<<<<<<< HEAD +======= + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(file_io, 'uri_reader_factory'), "RESTTokenFileIO should have uri_reader_factory property") uri_reader_factory = file_io.uri_reader_factory self.assertIsNotNone(uri_reader_factory, "uri_reader_factory should not be None") +<<<<<<< HEAD +======= + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(uri_reader_factory, 'create'), "uri_reader_factory should support create method") @@ -295,10 +327,17 @@ def test_filesystem_and_uri_reader_factory_after_serialization(self): self.warehouse_path, self.catalog_options ) +<<<<<<< HEAD pickled = pickle.dumps(original_file_io) restored_file_io = pickle.loads(pickled) +======= + + pickled = pickle.dumps(original_file_io) + restored_file_io = pickle.loads(pickled) + +>>>>>>> 5b8c6e7f3 (refresh rest token) self.assertIsNotNone(restored_file_io.filesystem, "filesystem should work after deserialization") self.assertIsNotNone(restored_file_io.uri_reader_factory, diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index da27c499e827..1a546c492bca 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -188,7 +188,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str latest_snapshot: Optional[Snapshot]) -> CommitResult: if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() - + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" From 9e45ba0e9041287a46f2a3213abe77d4962af108 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 16 Jan 2026 16:29:25 +0800 Subject: [PATCH 02/10] fix conflict issue during merge --- .../catalog/rest/rest_token_file_io.py | 9 ++-- .../pypaimon/read/reader/lance_utils.py | 5 +-- .../pypaimon/tests/reader_base_test.py | 15 ------- .../pypaimon/tests/rest/rest_base_test.py | 37 ++++++++++++++++ .../tests/rest/rest_token_file_io_test.py | 42 +------------------ 5 files changed, 42 insertions(+), 66 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 991a983bc402..a76db7490c37 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -22,18 +22,17 @@ from typing import Optional from cachetools import TTLCache +from pyarrow._fs import FileSystem +from api.rest_util import RESTUtil from pypaimon.api.rest_api import RESTApi from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.common.file_io import FileIO from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.common.identifier import Identifier -<<<<<<< HEAD from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions from pypaimon.common.uri_reader import UriReaderFactory -======= ->>>>>>> 5b8c6e7f3 (refresh rest token) class RESTTokenFileIO(FileIO): @@ -60,7 +59,6 @@ def __init__(self, identifier: Identifier, path: str, ttl=self._FILE_IO_CACHE_TTL ) -<<<<<<< HEAD def __getstate__(self): state = self.__dict__.copy() # Remove non-serializable objects @@ -182,7 +180,7 @@ def uri_reader_factory(self): @property def filesystem(self): return self.file_io().filesystem -======= + def _initialize_oss_fs(self, path) -> FileSystem: self.try_to_refresh_token() self.properties.update(self.token.token) @@ -190,7 +188,6 @@ def _initialize_oss_fs(self, path) -> FileSystem: def new_output_stream(self, path: Path): return self.filesystem.open_output_stream(str(path)) ->>>>>>> 5b8c6e7f3 (refresh rest token) def try_to_refresh_token(self): if self.should_refresh(): diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py b/paimon-python/pypaimon/read/reader/lance_utils.py index 8a3015146791..e973cf5ce6e4 100644 --- a/paimon-python/pypaimon/read/reader/lance_utils.py +++ b/paimon-python/pypaimon/read/reader/lance_utils.py @@ -26,17 +26,14 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[Dict[str, str]]]: """Convert path and extract storage options for Lance format.""" -<<<<<<< HEAD if hasattr(file_io, 'file_io'): file_io = file_io.file_io() -======= if hasattr(file_io, 'get_merged_properties'): properties = file_io.get_merged_properties() else: properties = file_io.properties if hasattr(file_io, 'properties') and file_io.properties else None ->>>>>>> 5b8c6e7f3 (refresh rest token) scheme, _, _ = file_io.parse_location(file_path) storage_options = None file_path_for_lance = file_io.to_filesystem_path(file_path) @@ -76,7 +73,7 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[D storage_options['oss_endpoint'] = properties.get(OssOptions.OSS_ENDPOINT) storage_options['virtual_hosted_style_request'] = 'true' - + if bucket and path: file_path_for_lance = f"oss://{bucket}/{path}" elif bucket: diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index a69b9a4b48db..beedfb5feac2 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1365,23 +1365,13 @@ def test_read_batch_size_config(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() -<<<<<<< HEAD - -======= - ->>>>>>> 5b8c6e7f3 (refresh rest token) if splits: - # Use _create_split_read to create reader split_read = table_read._create_split_read(splits[0]) reader = split_read.create_reader() batch_count = 0 total_rows = 0 max_batch_size = 0 -<<<<<<< HEAD - -======= ->>>>>>> 5b8c6e7f3 (refresh rest token) try: while True: batch = reader.read_arrow_batch() @@ -1393,11 +1383,6 @@ def test_read_batch_size_config(self): max_batch_size = max(max_batch_size, batch_rows) finally: reader.close() -<<<<<<< HEAD - -======= - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertGreater(batch_count, 1, f"With batch_size=10, should get multiple batches, got {batch_count}") self.assertEqual(total_rows, 50, "Should read all 50 rows") diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py b/paimon-python/pypaimon/tests/rest/rest_base_test.py index e45fac5cde61..9a4727ad7241 100644 --- a/paimon-python/pypaimon/tests/rest/rest_base_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py @@ -227,3 +227,40 @@ def _read_test_table(self, read_builder): table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() return table_read.to_arrow(splits) + + + def _write_test_table_with_schema(self, table, pa_schema): + """Write test data using the specified PyArrow schema.""" + write_builder = table.new_batch_write_builder() + + # first write + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', '2024-01-01'], + } + pa_table = pa.Table.from_pydict(data1, schema=pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # second write + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', '2025-08-08'], + } + pa_table = pa.Table.from_pydict(data2, schema=pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() \ No newline at end of file diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index f4c1eb7cedee..cb70437aba75 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -101,15 +101,13 @@ def test_try_to_write_atomic_directory_check(self): target_dir = os.path.join(self.temp_dir, "target_dir") os.makedirs(target_dir) -<<<<<<< HEAD - + result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") self.assertTrue(os.path.isdir(target_dir)) self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") -======= result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") @@ -117,7 +115,6 @@ def test_try_to_write_atomic_directory_check(self): self.assertTrue(os.path.isdir(target_dir)) self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") ->>>>>>> 5b8c6e7f3 (refresh rest token) normal_file = os.path.join(self.temp_dir, "normal_file.txt") result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") @@ -248,15 +245,7 @@ def test_catalog_options_not_modified(self): OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key", OssOptions.OSS_ENDPOINT.key(): "token-endpoint" } -<<<<<<< HEAD - - merged_token = file_io._merge_token_with_catalog_options(token_dict) - -======= - merged_token = file_io._merge_token_with_catalog_options(token_dict) - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertEqual( original_catalog_options.to_map(), catalog_options_copy.to_map(), @@ -280,19 +269,9 @@ def test_filesystem_property(self): self.warehouse_path, self.catalog_options ) -<<<<<<< HEAD - - self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") - filesystem = file_io.filesystem - self.assertIsNotNone(filesystem, "filesystem should not be None") - -======= - self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") filesystem = file_io.filesystem self.assertIsNotNone(filesystem, "filesystem should not be None") - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(filesystem, 'open_input_file'), "filesystem should support open_input_file method") @@ -303,20 +282,10 @@ def test_uri_reader_factory_property(self): self.warehouse_path, self.catalog_options ) -<<<<<<< HEAD - -======= - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(file_io, 'uri_reader_factory'), "RESTTokenFileIO should have uri_reader_factory property") uri_reader_factory = file_io.uri_reader_factory self.assertIsNotNone(uri_reader_factory, "uri_reader_factory should not be None") -<<<<<<< HEAD - -======= - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertTrue(hasattr(uri_reader_factory, 'create'), "uri_reader_factory should support create method") @@ -327,17 +296,8 @@ def test_filesystem_and_uri_reader_factory_after_serialization(self): self.warehouse_path, self.catalog_options ) -<<<<<<< HEAD - - pickled = pickle.dumps(original_file_io) - restored_file_io = pickle.loads(pickled) - -======= - pickled = pickle.dumps(original_file_io) restored_file_io = pickle.loads(pickled) - ->>>>>>> 5b8c6e7f3 (refresh rest token) self.assertIsNotNone(restored_file_io.filesystem, "filesystem should work after deserialization") self.assertIsNotNone(restored_file_io.uri_reader_factory, From 20bc04f302d628b132a017900219be4c5cb76b61 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 10:13:26 +0800 Subject: [PATCH 03/10] fix --- .../catalog/rest/rest_token_file_io.py | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index a76db7490c37..ebfaff53d8ad 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -18,11 +18,9 @@ import logging import threading import time -from pathlib import Path -from typing import Optional +from typing import Optional, Union from cachetools import TTLCache -from pyarrow._fs import FileSystem from api.rest_util import RESTUtil from pypaimon.api.rest_api import RESTApi @@ -44,11 +42,18 @@ class RESTTokenFileIO(FileIO): _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds def __init__(self, identifier: Identifier, path: str, - catalog_options: Optional[dict] = None): + catalog_options: Optional[Union[dict, Options]] = None): self.identifier = identifier self.path = path - self.catalog_options = catalog_options - self.properties = catalog_options or Options({}) # For compatibility with refresh_token() + # Convert dict to Options if needed for consistent API usage + if catalog_options is None: + self.catalog_options = None + elif isinstance(catalog_options, dict): + self.catalog_options = Options(catalog_options) + else: + # Assume it's already an Options object + self.catalog_options = catalog_options + self.properties = self.catalog_options or Options({}) # For compatibility with refresh_token() self.token: Optional[RESTToken] = None self.api_instance: Optional[RESTApi] = None self.lock = threading.Lock() @@ -181,14 +186,6 @@ def uri_reader_factory(self): def filesystem(self): return self.file_io().filesystem - def _initialize_oss_fs(self, path) -> FileSystem: - self.try_to_refresh_token() - self.properties.update(self.token.token) - return super()._initialize_oss_fs(path) - - def new_output_stream(self, path: Path): - return self.filesystem.open_output_stream(str(path)) - def try_to_refresh_token(self): if self.should_refresh(): with self.lock: From e56fed6e5d37c5ad66f666addca3765341d2911e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 10:40:58 +0800 Subject: [PATCH 04/10] fix code diff --- paimon-python/pypaimon/api/rest_util.py | 8 +++--- .../catalog/rest/rest_token_file_io.py | 2 +- .../pypaimon/tests/reader_base_test.py | 15 ++++++----- .../pypaimon/tests/rest/rest_base_test.py | 5 ++-- .../tests/rest/rest_token_file_io_test.py | 26 ++++++++++--------- .../pypaimon/write/file_store_commit.py | 2 +- 6 files changed, 31 insertions(+), 27 deletions(-) diff --git a/paimon-python/pypaimon/api/rest_util.py b/paimon-python/pypaimon/api/rest_util.py index 33ed29dce710..fd4d1da040a2 100644 --- a/paimon-python/pypaimon/api/rest_util.py +++ b/paimon-python/pypaimon/api/rest_util.py @@ -52,15 +52,15 @@ def merge( override_properties = {} if base_properties is None: base_properties = {} - + result = {} - + for key, value in base_properties.items(): if value is not None and key not in override_properties: result[key] = value - + for key, value in override_properties.items(): if value is not None: result[key] = value - + return result diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index ebfaff53d8ad..55aad02a88e8 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -22,8 +22,8 @@ from cachetools import TTLCache -from api.rest_util import RESTUtil from pypaimon.api.rest_api import RESTApi +from pypaimon.api.rest_util import RESTUtil from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.common.file_io import FileIO from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index beedfb5feac2..69b6de574a4f 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1111,7 +1111,7 @@ def test_primary_key_value_stats(self): value_stats = file_meta.value_stats self.assertIsNotNone(value_stats, "value_stats should not be None") - + if file_meta.value_stats_cols is None: expected_value_fields = ['name', 'price', 'category'] self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), @@ -1119,7 +1119,7 @@ def test_primary_key_value_stats(self): else: self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") - + expected_value_fields = ['name', 'price', 'category'] self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), f"value_stats_cols should contain value fields: {expected_value_fields}, " @@ -1135,7 +1135,7 @@ def test_primary_key_value_stats(self): self.assertEqual(len(value_stats.null_counts), expected_arity, f"value_stats null_counts should have {expected_arity} elements, " f"but got {len(value_stats.null_counts)}") - + self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " f"value_stats_cols length ({len(file_meta.value_stats_cols)})") @@ -1145,7 +1145,7 @@ def test_primary_key_value_stats(self): field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") - + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, table.fields @@ -1161,7 +1161,7 @@ def test_primary_key_value_stats(self): self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") - + actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") actual_ids = sorted(actual_data.column('id').to_pylist()) @@ -1365,13 +1365,15 @@ def test_read_batch_size_config(self): read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() + if splits: + # Use _create_split_read to create reader split_read = table_read._create_split_read(splits[0]) reader = split_read.create_reader() batch_count = 0 total_rows = 0 max_batch_size = 0 - + try: while True: batch = reader.read_arrow_batch() @@ -1383,6 +1385,7 @@ def test_read_batch_size_config(self): max_batch_size = max(max_batch_size, batch_rows) finally: reader.close() + self.assertGreater(batch_count, 1, f"With batch_size=10, should get multiple batches, got {batch_count}") self.assertEqual(total_rows, 50, "Should read all 50 rows") diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py b/paimon-python/pypaimon/tests/rest/rest_base_test.py index 9a4727ad7241..2fc14eb1118d 100644 --- a/paimon-python/pypaimon/tests/rest/rest_base_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py @@ -28,7 +28,7 @@ from pypaimon import CatalogFactory, Schema from pypaimon.api.api_response import ConfigResponse from pypaimon.api.auth import BearTokenAuthProvider -from pypaimon.api.options import Options +from pypaimon.common.options import Options from pypaimon.catalog.catalog_context import CatalogContext from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.catalog.rest.table_metadata import TableMetadata @@ -228,7 +228,6 @@ def _read_test_table(self, read_builder): splits = read_builder.new_scan().plan().splits() return table_read.to_arrow(splits) - def _write_test_table_with_schema(self, table, pa_schema): """Write test data using the specified PyArrow schema.""" write_builder = table.new_batch_write_builder() @@ -263,4 +262,4 @@ def _write_test_table_with_schema(self, table, pa_schema): table_write.write_arrow(pa_table) table_commit.commit(table_write.prepare_commit()) table_write.close() - table_commit.close() \ No newline at end of file + table_commit.close() diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index cb70437aba75..5be84051a2f5 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -101,20 +101,13 @@ def test_try_to_write_atomic_directory_check(self): target_dir = os.path.join(self.temp_dir, "target_dir") os.makedirs(target_dir) - + result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") self.assertTrue(os.path.isdir(target_dir)) self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") - - result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") - self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") - - self.assertTrue(os.path.isdir(target_dir)) - self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") - normal_file = os.path.join(self.temp_dir, "normal_file.txt") result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") @@ -230,32 +223,35 @@ def test_catalog_options_not_modified(self): CatalogOptions.URI.key(): "http://test-uri", "custom.key": "custom.value" }) - + catalog_options_copy = Options(original_catalog_options.to_map()) - + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): file_io = RESTTokenFileIO( self.identifier, self.warehouse_path, original_catalog_options ) - + token_dict = { OssOptions.OSS_ACCESS_KEY_ID.key(): "token-access-key", OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key", OssOptions.OSS_ENDPOINT.key(): "token-endpoint" } + merged_token = file_io._merge_token_with_catalog_options(token_dict) + self.assertEqual( original_catalog_options.to_map(), catalog_options_copy.to_map(), "Original catalog_options should not be modified" ) - + merged_properties = RESTUtil.merge( original_catalog_options.to_map(), merged_token ) + self.assertIn("custom.key", merged_properties) self.assertEqual(merged_properties["custom.key"], "custom.value") @@ -269,9 +265,11 @@ def test_filesystem_property(self): self.warehouse_path, self.catalog_options ) + self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") filesystem = file_io.filesystem self.assertIsNotNone(filesystem, "filesystem should not be None") + self.assertTrue(hasattr(filesystem, 'open_input_file'), "filesystem should support open_input_file method") @@ -282,10 +280,12 @@ def test_uri_reader_factory_property(self): self.warehouse_path, self.catalog_options ) + self.assertTrue(hasattr(file_io, 'uri_reader_factory'), "RESTTokenFileIO should have uri_reader_factory property") uri_reader_factory = file_io.uri_reader_factory self.assertIsNotNone(uri_reader_factory, "uri_reader_factory should not be None") + self.assertTrue(hasattr(uri_reader_factory, 'create'), "uri_reader_factory should support create method") @@ -296,8 +296,10 @@ def test_filesystem_and_uri_reader_factory_after_serialization(self): self.warehouse_path, self.catalog_options ) + pickled = pickle.dumps(original_file_io) restored_file_io = pickle.loads(pickled) + self.assertIsNotNone(restored_file_io.filesystem, "filesystem should work after deserialization") self.assertIsNotNone(restored_file_io.uri_reader_factory, diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 1a546c492bca..da27c499e827 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -188,7 +188,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str latest_snapshot: Optional[Snapshot]) -> CommitResult: if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() - + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" From 12d2e12b4ff2e4bb7fffd0fc44e6f64443878ff0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 10:51:34 +0800 Subject: [PATCH 05/10] fix code diff --- paimon-python/pypaimon/tests/reader_base_test.py | 4 ++-- paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 69b6de574a4f..3d9ed7f8743b 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1124,7 +1124,7 @@ def test_primary_key_value_stats(self): self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), f"value_stats_cols should contain value fields: {expected_value_fields}, " f"but got: {file_meta.value_stats_cols}") - + expected_arity = len(file_meta.value_stats_cols) self.assertEqual(value_stats.min_values.arity, expected_arity, f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " @@ -1139,7 +1139,7 @@ def test_primary_key_value_stats(self): self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " f"value_stats_cols length ({len(file_meta.value_stats_cols)})") - + for field_name in file_meta.value_stats_cols: is_system_field = (field_name.startswith('_KEY_') or field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index 5be84051a2f5..e9ff804f53ab 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -251,7 +251,6 @@ def test_catalog_options_not_modified(self): original_catalog_options.to_map(), merged_token ) - self.assertIn("custom.key", merged_properties) self.assertEqual(merged_properties["custom.key"], "custom.value") From 834abf695f7eba1d49b262d572d6f57a882a2e62 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 10:55:38 +0800 Subject: [PATCH 06/10] fix code diff --- paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index e9ff804f53ab..47ea8e6cb626 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -251,7 +251,7 @@ def test_catalog_options_not_modified(self): original_catalog_options.to_map(), merged_token ) - + self.assertIn("custom.key", merged_properties) self.assertEqual(merged_properties["custom.key"], "custom.value") self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), merged_properties) From e6484cd56bb0a094289ec8a98e262aa04547562c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 19:18:50 +0800 Subject: [PATCH 07/10] add token refresh --- paimon-python/pypaimon/api/rest_api.py | 2 +- .../catalog/rest/rest_token_file_io.py | 106 +++++++++++++----- .../pypaimon/read/reader/lance_utils.py | 9 +- 3 files changed, 87 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index 6b871ca58e87..25c2f47199c8 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -46,7 +46,7 @@ class RESTApi: PAGE_TOKEN = "pageToken" DATABASE_NAME_PATTERN = "databaseNamePattern" TABLE_NAME_PATTERN = "tableNamePattern" - TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000 + TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 60 def __init__(self, options: Union[Options, Dict[str, str]], config_required: bool = True): if isinstance(options, dict): diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 55aad02a88e8..ae8dc3e00f1c 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -41,11 +41,28 @@ class RESTTokenFileIO(FileIO): _FILE_IO_CACHE_MAXSIZE = 1000 _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds + _FILE_IO_CACHE: TTLCache = None + _FILE_IO_CACHE_LOCK = threading.Lock() + + _TOKEN_CACHE: dict = {} + _TOKEN_LOCKS: dict = {} + _TOKEN_LOCKS_LOCK = threading.Lock() + + @classmethod + def _get_file_io_cache(cls) -> TTLCache: + if cls._FILE_IO_CACHE is None: + with cls._FILE_IO_CACHE_LOCK: + if cls._FILE_IO_CACHE is None: + cls._FILE_IO_CACHE = TTLCache( + maxsize=cls._FILE_IO_CACHE_MAXSIZE, + ttl=cls._FILE_IO_CACHE_TTL + ) + return cls._FILE_IO_CACHE + def __init__(self, identifier: Identifier, path: str, catalog_options: Optional[Union[dict, Options]] = None): self.identifier = identifier self.path = path - # Convert dict to Options if needed for consistent API usage if catalog_options is None: self.catalog_options = None elif isinstance(catalog_options, dict): @@ -59,29 +76,20 @@ def __init__(self, identifier: Identifier, path: str, self.lock = threading.Lock() self.log = logging.getLogger(__name__) self._uri_reader_factory_cache: Optional[UriReaderFactory] = None - self._file_io_cache: TTLCache = TTLCache( - maxsize=self._FILE_IO_CACHE_MAXSIZE, - ttl=self._FILE_IO_CACHE_TTL - ) def __getstate__(self): state = self.__dict__.copy() # Remove non-serializable objects state.pop('lock', None) state.pop('api_instance', None) - state.pop('_file_io_cache', None) state.pop('_uri_reader_factory_cache', None) # token can be serialized, but we'll refresh it on deserialization return state def __setstate__(self, state): self.__dict__.update(state) - # Recreate lock and cache after deserialization + # Recreate lock after deserialization self.lock = threading.Lock() - self._file_io_cache = TTLCache( - maxsize=self._FILE_IO_CACHE_MAXSIZE, - ttl=self._FILE_IO_CACHE_TTL - ) self._uri_reader_factory_cache = None # api_instance will be recreated when needed self.api_instance = None @@ -93,13 +101,21 @@ def file_io(self) -> FileIO: return FileIO.get(self.path, self.catalog_options or Options({})) cache_key = self.token + cache = self._get_file_io_cache() - file_io = self._file_io_cache.get(cache_key) + file_io = cache.get(cache_key) if file_io is not None: return file_io - with self.lock: - file_io = self._file_io_cache.get(cache_key) + with self._FILE_IO_CACHE_LOCK: + self.try_to_refresh_token() + + if self.token is None: + return FileIO.get(self.path, self.catalog_options or Options({})) + + cache_key = self.token + cache = self._get_file_io_cache() + file_io = cache.get(cache_key) if file_io is not None: return file_io @@ -111,7 +127,7 @@ def file_io(self) -> FileIO: merged_options = Options(merged_properties) file_io = PyArrowFileIO(self.path, merged_options) - self._file_io_cache[cache_key] = file_io + cache[cache_key] = file_io return file_io def _merge_token_with_catalog_options(self, token: dict) -> dict: @@ -187,16 +203,55 @@ def filesystem(self): return self.file_io().filesystem def try_to_refresh_token(self): - if self.should_refresh(): - with self.lock: - if self.should_refresh(): - self.refresh_token() + identifier_str = str(self.identifier) + + if self.token is not None and not self._is_token_expired(self.token): + return + + cached_token = self._get_cached_token(identifier_str) + if cached_token and not self._is_token_expired(cached_token): + self.token = cached_token + return + + global_lock = self._get_global_token_lock(identifier_str) + + with global_lock: + cached_token = self._get_cached_token(identifier_str) + if cached_token and not self._is_token_expired(cached_token): + self.token = cached_token + return + + token_to_check = cached_token if cached_token else self.token + if token_to_check is None or self._is_token_expired(token_to_check): + self.refresh_token() + self._set_cached_token(identifier_str, self.token) + + def _get_cached_token(self, identifier_str: str) -> Optional[RESTToken]: + with self._TOKEN_LOCKS_LOCK: + return self._TOKEN_CACHE.get(identifier_str) + + def _set_cached_token(self, identifier_str: str, token: RESTToken): + with self._TOKEN_LOCKS_LOCK: + self._TOKEN_CACHE[identifier_str] = token + + def _is_token_expired(self, token: Optional[RESTToken]) -> bool: + if token is None: + return True + current_time = int(time.time() * 1000) + return (token.expire_at_millis - current_time) < RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS + + def _get_global_token_lock(self, identifier_str: str) -> threading.Lock: + with self._TOKEN_LOCKS_LOCK: + if identifier_str not in self._TOKEN_LOCKS: + self._TOKEN_LOCKS[identifier_str] = threading.Lock() + return self._TOKEN_LOCKS[identifier_str] def should_refresh(self): if self.token is None: return True current_time = int(time.time() * 1000) - return (self.token.expire_at_millis - current_time) < RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS + time_until_expiry = self.token.expire_at_millis - current_time + return time_until_expiry < RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS def refresh_token(self): self.log.info(f"begin refresh data token for identifier [{self.identifier}]") @@ -207,17 +262,12 @@ def refresh_token(self): self.log.info( f"end refresh data token for identifier [{self.identifier}] expiresAtMillis [{response.expires_at_millis}]" ) - self.token = RESTToken(response.token, response.expires_at_millis) + new_token = RESTToken(response.token, response.expires_at_millis) + self.token = new_token def valid_token(self): self.try_to_refresh_token() return self.token def close(self): - with self.lock: - for file_io in self._file_io_cache.values(): - try: - file_io.close() - except Exception as e: - self.log.warning(f"Error closing cached FileIO: {e}") - self._file_io_cache.clear() + pass diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py b/paimon-python/pypaimon/read/reader/lance_utils.py index e973cf5ce6e4..2e3a331e4b4a 100644 --- a/paimon-python/pypaimon/read/reader/lance_utils.py +++ b/paimon-python/pypaimon/read/reader/lance_utils.py @@ -26,17 +26,24 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[Dict[str, str]]]: """Convert path and extract storage options for Lance format.""" + # For RESTTokenFileIO, get underlying FileIO which already has latest token merged + # This follows Java implementation: ((RESTTokenFileIO) fileIO).fileIO() + # The file_io() method will refresh token and return a FileIO with merged token if hasattr(file_io, 'file_io'): + # Call file_io() to get underlying FileIO with latest token + # This ensures token is refreshed and merged with catalog options file_io = file_io.file_io() + # Now get properties from the underlying FileIO (which has latest token) if hasattr(file_io, 'get_merged_properties'): properties = file_io.get_merged_properties() else: properties = file_io.properties if hasattr(file_io, 'properties') and file_io.properties else None scheme, _, _ = file_io.parse_location(file_path) - storage_options = None file_path_for_lance = file_io.to_filesystem_path(file_path) + + storage_options = None if scheme in {'file', None} or not scheme: if not os.path.isabs(file_path_for_lance): From 91f78de22fc0f66ea2ff82db5168080a584ba6ad Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 19:22:48 +0800 Subject: [PATCH 08/10] add _merge_token_with_catalog_options --- paimon-python/pypaimon/catalog/rest/rest_token_file_io.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index ae8dc3e00f1c..34721216cadb 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -119,10 +119,9 @@ def file_io(self) -> FileIO: if file_io is not None: return file_io - merged_token = self._merge_token_with_catalog_options(self.token.token) merged_properties = RESTUtil.merge( self.catalog_options.to_map() if self.catalog_options else {}, - merged_token + self.token.token ) merged_options = Options(merged_properties) @@ -262,7 +261,9 @@ def refresh_token(self): self.log.info( f"end refresh data token for identifier [{self.identifier}] expiresAtMillis [{response.expires_at_millis}]" ) - new_token = RESTToken(response.token, response.expires_at_millis) + + merged_token_dict = self._merge_token_with_catalog_options(response.token) + new_token = RESTToken(merged_token_dict, response.expires_at_millis) self.token = new_token def valid_token(self): From 6346b471e2b8881d6d0c46ed5005d7c402c52d85 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 19:39:55 +0800 Subject: [PATCH 09/10] merge oss endpoint too --- paimon-python/pypaimon/catalog/rest/rest_token_file_io.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 34721216cadb..7769ba639b28 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -123,6 +123,10 @@ def file_io(self) -> FileIO: self.catalog_options.to_map() if self.catalog_options else {}, self.token.token ) + if self.catalog_options: + dlf_oss_endpoint = self.catalog_options.get(CatalogOptions.DLF_OSS_ENDPOINT) + if dlf_oss_endpoint and dlf_oss_endpoint.strip(): + merged_properties[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint merged_options = Options(merged_properties) file_io = PyArrowFileIO(self.path, merged_options) From 12c6ab2bfecbe2fbddcf16fc179183e67776f8ef Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 17 Jan 2026 20:24:08 +0800 Subject: [PATCH 10/10] revert TOKEN_EXPIRATION_SAFE_TIME_MILLIS change for test --- paimon-python/pypaimon/api/rest_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index 25c2f47199c8..6b871ca58e87 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -46,7 +46,7 @@ class RESTApi: PAGE_TOKEN = "pageToken" DATABASE_NAME_PATTERN = "databaseNamePattern" TABLE_NAME_PATTERN = "tableNamePattern" - TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 60 + TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000 def __init__(self, options: Union[Options, Dict[str, str]], config_required: bool = True): if isinstance(options, dict):