From 3bca00a4adcfec01b66fc619c6ed780c483888ea Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Tue, 30 Dec 2025 18:30:27 +0800 Subject: [PATCH 1/7] feat: add Iceberg JDBC catalog support --- docs/iceberg_jdbc_catalog_impl.md | 304 ++++++++++++++++++ .../iceberg/IcebergExternalCatalog.java | 1 + .../IcebergExternalCatalogFactory.java | 2 + .../iceberg/IcebergJdbcExternalCatalog.java | 31 ++ .../iceberg/source/IcebergScanNode.java | 1 + .../IcebergJdbcMetaStoreProperties.java | 189 +++++++++++ .../metastore/IcebergPropertiesFactory.java | 1 + .../apache/doris/persist/gson/GsonUtils.java | 2 + .../IcebergJdbcMetaStorePropertiesTest.java | 83 +++++ 9 files changed, 614 insertions(+) create mode 100644 docs/iceberg_jdbc_catalog_impl.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java diff --git a/docs/iceberg_jdbc_catalog_impl.md b/docs/iceberg_jdbc_catalog_impl.md new file mode 100644 index 00000000000000..af20e09271ae46 --- /dev/null +++ b/docs/iceberg_jdbc_catalog_impl.md @@ -0,0 +1,304 @@ +# Doris 支持 Iceberg JDBC Catalog(实现指南) + +## 目标 +在 Doris 中新增对 Iceberg JDBC Catalog 的支持(Iceberg 元数据存储在 JDBC 数据库中)。 +集成 Iceberg `org.apache.iceberg.jdbc.JdbcCatalog` 到现有 Iceberg 外部 Catalog 框架。 + +## 范围 +- 新增 Iceberg catalog 类型:`iceberg.catalog.type = jdbc`。 +- FE 使用 Iceberg `JdbcCatalog` 做元数据操作。 +- Iceberg 扫描链路保持不变(`IcebergApiSource` + BE 文件扫描)。 + +## 非目标 +- Doris JDBC 外表(已有)。 +- BE 侧 Iceberg 扫描逻辑调整。 +- JDBC 驱动包分发或部署流程变更。 + +## 总体流程 +1. `CREATE CATALOG ... type=iceberg` 由 `IcebergExternalCatalogFactory` 创建。 +2. `CatalogProperty` 根据 `IcebergPropertiesFactory` 构建 `MetastoreProperties`。 +3. 新增 `IcebergJdbcMetaStoreProperties`,内部初始化 Iceberg `JdbcCatalog`。 +4. `IcebergScanNode` 继续使用 `IcebergApiSource` 读取 Iceberg 数据文件。 + +## 参数总览 + +### 必需参数 + +| 参数名 | 曾用名/别名 | 说明 | 默认值 | 必填 | +|--------|-------------|------|--------|------| +| type | - | 固定为 `iceberg` | - | 是 | +| iceberg.catalog.type | - | 固定为 `jdbc` | - | 是 | +| uri | iceberg.jdbc.uri | JDBC 连接串,如 `jdbc:postgresql://host:5432/iceberg` | - | 是 | +| warehouse | - | Iceberg warehouse 路径 | - | 是 | +| driver_url | - | JDBC 驱动 JAR 路径,如 `postgresql-42.7.3.jar` | - | 是 | +| driver_class | - | JDBC 驱动类名,如 `org.postgresql.Driver` | - | 是 | + +### JDBC 认证参数 + +| 参数名 | 说明 | 默认值 | 必填 | +|--------|------|--------|------| +| jdbc.user | JDBC 用户名 | - | 否 | +| jdbc.password | JDBC 密码(敏感参数,会被脱敏显示) | - | 否 | + +### JDBC Catalog 可选参数 + +| 参数名 | 说明 | 默认值 | 必填 | +|--------|------|--------|------| +| jdbc.init-catalog-tables | 是否自动创建 Catalog 元数据表 | false | 否 | +| jdbc.schema-version | 元数据表 Schema 版本(V0/V1) | V1 | 否 | +| jdbc.strict-mode | 是否启用严格模式 | false | 否 | + +### JDBC 连接参数 +以下参数直接透传给 JDBC 驱动: + +| 参数名 | 说明 | 默认值 | +|--------|------|--------| +| jdbc.useSSL | 是否使用 SSL 连接 | false | +| jdbc.verifyServerCertificate | 是否验证服务器证书 | false | +| jdbc.sslMode | SSL 模式(MySQL 8.x 推荐使用) | - | + +### 存储属性(StorageProperties) + +JDBC Catalog 同样需要配置存储属性以访问 Iceberg 数据文件。支持的存储系统包括: + +- HDFS +- AWS S3 +- 阿里云 OSS +- 腾讯云 COS +- 华为云 OBS +- MinIO + +存储属性配置方式与其他 Iceberg Catalog 类型一致,请参阅 Doris 文档【支持的存储系统】部分。 + +示例(S3 存储): +```sql +CREATE CATALOG iceberg_jdbc PROPERTIES ( + -- JDBC Catalog 参数 + "type" = "iceberg", + "iceberg.catalog.type" = "jdbc", + "uri" = "jdbc:postgresql://host:5432/iceberg", + "warehouse" = "s3://bucket/warehouse", + "driver_url" = "postgresql-42.7.3.jar", + "driver_class" = "org.postgresql.Driver", + "jdbc.user" = "iceberg", + "jdbc.password" = "secret", + -- 存储属性 + "s3.endpoint" = "http://minio:9000", + "s3.access_key" = "minioadmin", + "s3.secret_key" = "minioadmin", + "s3.region" = "us-east-1" +); +``` + +### 说明 +- 保留 `jdbc.password` 可复用现有敏感信息屏蔽逻辑。 +- 如新增别名(例如 `iceberg.jdbc.password`),同步加入敏感 key 列表。 +- `jdbc.*` 需原样透传给 Iceberg JDBC Catalog(由 JDBC 驱动解析),不要过滤未知 `jdbc.` 参数。 +- 部分 JDBC 驱动已弃用 `verifyServerCertificate`(例如 MySQL 8.x 建议使用 `sslMode`), + 以驱动版本支持情况为准。 + +## 代码改动 + +### 1) 新增 catalog 类型常量 +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java` +- 增加 `ICEBERG_JDBC = "jdbc"`。 + +### 2) 新增 ExternalCatalog 实现 +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java` +- 参考 `IcebergRestExternalCatalog`。 +- 构造函数中设置 `catalogProperty = new CatalogProperty(resource, props)`。 + +### 3) 新增 Metastore Properties +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java` + +职责: +- 继承 `AbstractIcebergProperties`。 +- 使用 `@ConnectorProperty` 定义字段: + - `uri` + - `warehouse` + - `jdbc.user` + - `jdbc.password` + - `jdbc.init-catalog-tables` + - `jdbc.schema-version` + - `jdbc.strict-mode` +- 实现 `getIcebergCatalogType()` 返回 `ICEBERG_JDBC`。 +- 实现 `initCatalog(...)`: + - 通过 `CatalogUtil.buildIcebergCatalog` 创建实例。 + - 设置 `CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC`。 + - 透传 `warehouse`、`uri` 及所有 `jdbc.*`。 + - 如需 FileIO 参数,复用 `IcebergRestProperties.toFileIOProperties(...)`。 + +### 4) 注册到 Properties Factory +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java` +- 增加 `register("jdbc", IcebergJdbcMetaStoreProperties::new)`。 + +### 5) 注册到 Catalog Factory +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java` +- 增加 `ICEBERG_JDBC` 分支,创建 `IcebergJdbcExternalCatalog`。 + +### 6) 注册到 Gson +文件:`fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java` +- 在 `dsTypeAdapterFactory` 中注册 `IcebergJdbcExternalCatalog`。 + +### 7) 扫描节点支持 +文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java` +- switch 中新增 `ICEBERG_JDBC` 分支。 +- 仍使用 `IcebergApiSource`。 + +### 8) 单测 +新增:`fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java` + +建议断言: +- `CatalogUtil.ICEBERG_CATALOG_TYPE` 为 `jdbc`。 +- `warehouse` 与 `uri` 透传正确。 +- `jdbc.*` 参数在最终 options 中可见。 + +可选集成测试: +- `regression-test/suites/external_table_p0/iceberg` 新增 JDBC 元数据库测试。 +- 使用真实 JDBC 数据库(如 PostgreSQL)。 + +## 示例 SQL + +### 基础示例(PostgreSQL + HDFS) +```sql +CREATE CATALOG iceberg_jdbc PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "jdbc", + "uri" = "jdbc:postgresql://host:5432/iceberg", + "warehouse" = "hdfs://namenode:8020/warehouse", + "driver_url" = "postgresql-42.7.3.jar", + "driver_class" = "org.postgresql.Driver", + "jdbc.user" = "iceberg", + "jdbc.password" = "secret", + "jdbc.init-catalog-tables" = "true" +); +``` + +### S3 存储示例(PostgreSQL + MinIO) +```sql +CREATE CATALOG iceberg_jdbc_s3 PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "jdbc", + "uri" = "jdbc:postgresql://localhost:5432/iceberg_db", + "warehouse" = "s3://iceberg-bucket/warehouse", + "driver_url" = "postgresql-42.7.3.jar", + "driver_class" = "org.postgresql.Driver", + "jdbc.user" = "iceberg_user", + "jdbc.password" = "Iceberg123", + "jdbc.init-catalog-tables" = "true", + -- S3/MinIO 存储配置 + "s3.endpoint" = "http://minio:9000", + "s3.access_key" = "minioadmin", + "s3.secret_key" = "minioadmin", + "s3.region" = "us-east-1" +); +``` + +### MySQL 示例 +```sql +CREATE CATALOG iceberg_jdbc_mysql PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "jdbc", + "uri" = "jdbc:mysql://host:3306/iceberg?useSSL=false", + "warehouse" = "s3://bucket/warehouse", + "driver_url" = "mysql-connector-j-8.0.33.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "jdbc.user" = "iceberg", + "jdbc.password" = "secret", + "jdbc.init-catalog-tables" = "true", + -- S3 存储配置 + "s3.endpoint" = "https://s3.amazonaws.com", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "us-east-1" +); +``` + +### SQLite 示例(本地测试) +```sql +CREATE CATALOG iceberg_jdbc_sqlite PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "jdbc", + "uri" = "jdbc:sqlite:/path/to/iceberg_catalog.db", + "warehouse" = "file:///tmp/iceberg-warehouse", + "driver_url" = "sqlite-jdbc-3.44.1.0.jar", + "driver_class" = "org.sqlite.JDBC" +); +``` + +## 驱动要求 +Iceberg `JdbcCatalog` 在 FE 侧通过 JDBC 访问元数据数据库。 +驱动加载方式与 Doris JDBC Catalog 保持一致,避免将驱动放入 `fe/lib` 导致依赖冲突。 + +### 驱动存放位置 +将 JDBC 驱动 JAR 包放入以下任一目录: +- `$DORIS_HOME/jdbc_drivers/`(旧版默认) +- `$DORIS_HOME/plugins/jdbc_drivers/`(新版默认) + +### driver_url 支持的格式 +- 相对路径:`postgresql-42.7.3.jar`(从 `jdbc_drivers` 目录加载) +- 绝对路径:`file:///path/to/postgresql-42.7.3.jar` +- 远程 URL:`http://host/postgresql-42.7.3.jar` + +### 常用驱动类名 +| 数据库 | driver_class | +|--------|-------------| +| PostgreSQL | `org.postgresql.Driver` | +| MySQL | `com.mysql.cj.jdbc.Driver` | +| SQLite | `org.sqlite.JDBC` | + +## 验证清单 +- `SHOW CATALOGS` 能看到新 catalog。 +- `SHOW DATABASES FROM iceberg_jdbc`。 +- `SHOW TABLES FROM iceberg_jdbc.`。 +- `SELECT * FROM iceberg_jdbc.. LIMIT 1`。 +- `INSERT INTO iceberg_jdbc..
VALUES (...)` (写入测试)。 + +## 功能支持 + +### 查询操作 +- 基础查询:`SELECT * FROM table` +- 时间旅行:`FOR TIME AS OF` / `FOR VERSION AS OF` +- Branch 和 Tag 查询 +- 系统表查询:`$snapshots`、`$history`、`$files` 等 + +### 写入操作 +- `INSERT INTO` +- `INSERT OVERWRITE` +- `CTAS`(Create Table As Select) + +### 库表管理 +- `CREATE DATABASE` +- `DROP DATABASE` +- `CREATE TABLE` +- `DROP TABLE` +- Schema 变更(`ALTER TABLE`) +- Partition Evolution + +### 表操作 +- `rewrite_data_files`(小文件合并) +- `rollback_to_snapshot` +- `rollback_to_timestamp` +- Branch & Tag 管理 + +## 与其他 Iceberg Catalog 类型的对比 + +| 功能 | HMS | REST | JDBC | Hadoop | +|-----|-----|------|------|--------| +| 元数据存储 | Hive Metastore | REST 服务 | JDBC 数据库 | 文件系统 | +| 并发控制 | HMS 锁机制 | 服务端控制 | 数据库事务 | 文件锁(受限) | +| 部署复杂度 | 需要 HMS 集群 | 需要 REST 服务 | 需要数据库 | 最简单 | +| 适用场景 | 已有 Hive 环境 | 云原生/多租户 | 独立部署 | 开发测试 | + +## 兼容性说明 +- Doris 当前 Iceberg 版本见 `fe/pom.xml`(1.9.1)。 +- Iceberg 属性名如有变化,需要同步更新 `IcebergJdbcMetaStoreProperties`。 +- JDBC Catalog 支持的数据库:PostgreSQL、MySQL、SQLite(测试用)。 + +## 注意事项 + +1. **驱动加载**:JDBC 驱动必须放在正确的目录,且 `driver_class` 必须与驱动版本匹配。 +2. **数据库初始化**:首次使用时建议设置 `jdbc.init-catalog-tables=true`,Iceberg 会自动创建元数据表。 +3. **敏感信息**:`jdbc.password` 会在 `SHOW CREATE CATALOG` 中脱敏显示。 +4. **存储配置**:JDBC Catalog 只管理元数据,数据文件存储在 `warehouse` 指定的位置,需要配置相应的存储属性。 +5. **并发控制**:JDBC Catalog 使用数据库事务进行并发控制,确保数据库支持事务(如 PostgreSQL、MySQL InnoDB)。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 57d80c804c0c18..f6c7146c670067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -47,6 +47,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; + public static final String ICEBERG_JDBC = "jdbc"; public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index 748c0805393b1e..824d20e70007ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -39,6 +39,8 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String return new IcebergGlueExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_DLF: return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment); + case IcebergExternalCatalog.ICEBERG_JDBC: + return new IcebergJdbcExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_HADOOP: return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_S3_TABLES: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java new file mode 100644 index 00000000000000..aeb2fd9deec18e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.CatalogProperty; + +import java.util.Map; + +public class IcebergJdbcExternalCatalog extends IcebergExternalCatalog { + + public IcebergJdbcExternalCatalog(long catalogId, String name, String resource, Map props, + String comment) { + super(catalogId, name, comment); + catalogProperty = new CatalogProperty(resource, props); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 1acafbde5df74f..8095afea6955c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -142,6 +142,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol case IcebergExternalCatalog.ICEBERG_DLF: case IcebergExternalCatalog.ICEBERG_GLUE: case IcebergExternalCatalog.ICEBERG_HADOOP: + case IcebergExternalCatalog.ICEBERG_JDBC: case IcebergExternalCatalog.ICEBERG_S3_TABLES: source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java new file mode 100644 index 00000000000000..1e252357319c02 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Catalog; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { + + private static final String JDBC_PREFIX = "jdbc."; + + private Map icebergJdbcCatalogProperties; + + @ConnectorProperty( + names = {"uri", "iceberg.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Iceberg JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"jdbc.user"}, + required = false, + description = "Username for the Iceberg JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Iceberg JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"jdbc.init-catalog-tables"}, + required = false, + description = "Whether to create catalog tables if they do not exist." + ) + private String jdbcInitCatalogTables; + + @ConnectorProperty( + names = {"jdbc.schema-version"}, + required = false, + description = "Iceberg JDBC catalog schema version (V0/V1)." + ) + private String jdbcSchemaVersion; + + @ConnectorProperty( + names = {"jdbc.strict-mode"}, + required = false, + description = "Whether to enforce strict JDBC catalog schema checks." + ) + private String jdbcStrictMode; + + public IcebergJdbcMetaStoreProperties(Map props) { + super(props); + } + + @Override + public String getIcebergCatalogType() { + return IcebergExternalCatalog.ICEBERG_JDBC; + } + + @Override + public void initNormalizeAndCheckProps() { + super.initNormalizeAndCheckProps(); + initIcebergJdbcCatalogProperties(); + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initCatalog(String catalogName, Map catalogProps, + List storagePropertiesList) { + Map fileIOProperties = Maps.newHashMap(); + Configuration conf = new Configuration(); + toFileIOProperties(storagePropertiesList, fileIOProperties, conf); + + Map options = Maps.newHashMap(getIcebergJdbcCatalogProperties()); + options.putAll(fileIOProperties); + return CatalogUtil.buildIcebergCatalog(catalogName, options, conf); + } + + public Map getIcebergJdbcCatalogProperties() { + return Collections.unmodifiableMap(icebergJdbcCatalogProperties); + } + + private void initIcebergJdbcCatalogProperties() { + icebergJdbcCatalogProperties = new HashMap<>(); + icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC); + icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri); + if (StringUtils.isNotBlank(warehouse)) { + icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + } + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password", jdbcPassword); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.init-catalog-tables", jdbcInitCatalogTables); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.schema-version", jdbcSchemaVersion); + addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.strict-mode", jdbcStrictMode); + + if (origProps != null) { + for (Map.Entry entry : origProps.entrySet()) { + String key = entry.getKey(); + if (key != null && key.startsWith(JDBC_PREFIX) + && !icebergJdbcCatalogProperties.containsKey(key)) { + icebergJdbcCatalogProperties.put(key, entry.getValue()); + } + } + } + } + + private static void addIfNotBlank(Map props, String key, String value) { + if (StringUtils.isNotBlank(value)) { + props.put(key, value); + } + } + + private static void toFileIOProperties(List storagePropertiesList, + Map fileIOProperties, Configuration conf) { + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties instanceof AbstractS3CompatibleProperties) { + toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties); + } else if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + } + } + + private static void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, + Map options) { + if (StringUtils.isNotBlank(s3Properties.getEndpoint())) { + options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint()); + } + if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) { + options.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle()); + } + if (StringUtils.isNotBlank(s3Properties.getRegion())) { + options.put(AwsClientProperties.CLIENT_REGION, s3Properties.getRegion()); + } + if (StringUtils.isNotBlank(s3Properties.getAccessKey())) { + options.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey()); + } + if (StringUtils.isNotBlank(s3Properties.getSecretKey())) { + options.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey()); + } + if (StringUtils.isNotBlank(s3Properties.getSessionToken())) { + options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java index 64fd28216cfb7f..333c6c44806ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java @@ -43,6 +43,7 @@ public IcebergPropertiesFactory() { register("hadoop", IcebergFileSystemMetaStoreProperties::new); register("s3tables", IcebergS3TablesMetaStoreProperties::new); register("dlf", IcebergAliyunDLFMetaStoreProperties::new); + register("jdbc", IcebergJdbcMetaStoreProperties::new); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index c0f31f879596f2..39863f1b2a520c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -145,6 +145,7 @@ import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergJdbcExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; @@ -411,6 +412,7 @@ public class GsonUtils { .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergJdbcExternalCatalog.class, IcebergJdbcExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergS3TablesExternalCatalog.class, IcebergS3TablesExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java new file mode 100644 index 00000000000000..b35782b20339d6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergJdbcMetaStorePropertiesTest { + + @Test + public void testBasicJdbcProperties() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + props.put("warehouse", "s3://warehouse/path"); + props.put("jdbc.user", "iceberg"); + props.put("jdbc.password", "secret"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + jdbcProps.initNormalizeAndCheckProps(); + + Map catalogProps = jdbcProps.getIcebergJdbcCatalogProperties(); + Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC, + catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE)); + Assertions.assertEquals("jdbc:mysql://localhost:3306/iceberg", catalogProps.get(CatalogProperties.URI)); + Assertions.assertEquals("s3://warehouse/path", catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION)); + Assertions.assertEquals("iceberg", catalogProps.get("jdbc.user")); + Assertions.assertEquals("secret", catalogProps.get("jdbc.password")); + } + + @Test + public void testJdbcPrefixPassthrough() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + props.put("warehouse", "s3://warehouse/path"); + props.put("jdbc.useSSL", "true"); + props.put("jdbc.verifyServerCertificate", "true"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + jdbcProps.initNormalizeAndCheckProps(); + + Map catalogProps = jdbcProps.getIcebergJdbcCatalogProperties(); + Assertions.assertEquals("true", catalogProps.get("jdbc.useSSL")); + Assertions.assertEquals("true", catalogProps.get("jdbc.verifyServerCertificate")); + } + + @Test + public void testMissingWarehouse() { + Map props = new HashMap<>(); + props.put("uri", "jdbc:mysql://localhost:3306/iceberg"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + Assertions.assertThrows(IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps); + } + + @Test + public void testMissingUri() { + Map props = new HashMap<>(); + props.put("warehouse", "s3://warehouse/path"); + + IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props); + Assertions.assertThrows(IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps); + } +} From a35eb6c45b70e96d257576614f12eb9b50c17ae9 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Wed, 31 Dec 2025 10:19:54 +0800 Subject: [PATCH 2/7] fix aws s3 --- docs/iceberg_jdbc_catalog_impl.md | 304 ------------------ .../IcebergJdbcMetaStoreProperties.java | 3 + 2 files changed, 3 insertions(+), 304 deletions(-) delete mode 100644 docs/iceberg_jdbc_catalog_impl.md diff --git a/docs/iceberg_jdbc_catalog_impl.md b/docs/iceberg_jdbc_catalog_impl.md deleted file mode 100644 index af20e09271ae46..00000000000000 --- a/docs/iceberg_jdbc_catalog_impl.md +++ /dev/null @@ -1,304 +0,0 @@ -# Doris 支持 Iceberg JDBC Catalog(实现指南) - -## 目标 -在 Doris 中新增对 Iceberg JDBC Catalog 的支持(Iceberg 元数据存储在 JDBC 数据库中)。 -集成 Iceberg `org.apache.iceberg.jdbc.JdbcCatalog` 到现有 Iceberg 外部 Catalog 框架。 - -## 范围 -- 新增 Iceberg catalog 类型:`iceberg.catalog.type = jdbc`。 -- FE 使用 Iceberg `JdbcCatalog` 做元数据操作。 -- Iceberg 扫描链路保持不变(`IcebergApiSource` + BE 文件扫描)。 - -## 非目标 -- Doris JDBC 外表(已有)。 -- BE 侧 Iceberg 扫描逻辑调整。 -- JDBC 驱动包分发或部署流程变更。 - -## 总体流程 -1. `CREATE CATALOG ... type=iceberg` 由 `IcebergExternalCatalogFactory` 创建。 -2. `CatalogProperty` 根据 `IcebergPropertiesFactory` 构建 `MetastoreProperties`。 -3. 新增 `IcebergJdbcMetaStoreProperties`,内部初始化 Iceberg `JdbcCatalog`。 -4. `IcebergScanNode` 继续使用 `IcebergApiSource` 读取 Iceberg 数据文件。 - -## 参数总览 - -### 必需参数 - -| 参数名 | 曾用名/别名 | 说明 | 默认值 | 必填 | -|--------|-------------|------|--------|------| -| type | - | 固定为 `iceberg` | - | 是 | -| iceberg.catalog.type | - | 固定为 `jdbc` | - | 是 | -| uri | iceberg.jdbc.uri | JDBC 连接串,如 `jdbc:postgresql://host:5432/iceberg` | - | 是 | -| warehouse | - | Iceberg warehouse 路径 | - | 是 | -| driver_url | - | JDBC 驱动 JAR 路径,如 `postgresql-42.7.3.jar` | - | 是 | -| driver_class | - | JDBC 驱动类名,如 `org.postgresql.Driver` | - | 是 | - -### JDBC 认证参数 - -| 参数名 | 说明 | 默认值 | 必填 | -|--------|------|--------|------| -| jdbc.user | JDBC 用户名 | - | 否 | -| jdbc.password | JDBC 密码(敏感参数,会被脱敏显示) | - | 否 | - -### JDBC Catalog 可选参数 - -| 参数名 | 说明 | 默认值 | 必填 | -|--------|------|--------|------| -| jdbc.init-catalog-tables | 是否自动创建 Catalog 元数据表 | false | 否 | -| jdbc.schema-version | 元数据表 Schema 版本(V0/V1) | V1 | 否 | -| jdbc.strict-mode | 是否启用严格模式 | false | 否 | - -### JDBC 连接参数 -以下参数直接透传给 JDBC 驱动: - -| 参数名 | 说明 | 默认值 | -|--------|------|--------| -| jdbc.useSSL | 是否使用 SSL 连接 | false | -| jdbc.verifyServerCertificate | 是否验证服务器证书 | false | -| jdbc.sslMode | SSL 模式(MySQL 8.x 推荐使用) | - | - -### 存储属性(StorageProperties) - -JDBC Catalog 同样需要配置存储属性以访问 Iceberg 数据文件。支持的存储系统包括: - -- HDFS -- AWS S3 -- 阿里云 OSS -- 腾讯云 COS -- 华为云 OBS -- MinIO - -存储属性配置方式与其他 Iceberg Catalog 类型一致,请参阅 Doris 文档【支持的存储系统】部分。 - -示例(S3 存储): -```sql -CREATE CATALOG iceberg_jdbc PROPERTIES ( - -- JDBC Catalog 参数 - "type" = "iceberg", - "iceberg.catalog.type" = "jdbc", - "uri" = "jdbc:postgresql://host:5432/iceberg", - "warehouse" = "s3://bucket/warehouse", - "driver_url" = "postgresql-42.7.3.jar", - "driver_class" = "org.postgresql.Driver", - "jdbc.user" = "iceberg", - "jdbc.password" = "secret", - -- 存储属性 - "s3.endpoint" = "http://minio:9000", - "s3.access_key" = "minioadmin", - "s3.secret_key" = "minioadmin", - "s3.region" = "us-east-1" -); -``` - -### 说明 -- 保留 `jdbc.password` 可复用现有敏感信息屏蔽逻辑。 -- 如新增别名(例如 `iceberg.jdbc.password`),同步加入敏感 key 列表。 -- `jdbc.*` 需原样透传给 Iceberg JDBC Catalog(由 JDBC 驱动解析),不要过滤未知 `jdbc.` 参数。 -- 部分 JDBC 驱动已弃用 `verifyServerCertificate`(例如 MySQL 8.x 建议使用 `sslMode`), - 以驱动版本支持情况为准。 - -## 代码改动 - -### 1) 新增 catalog 类型常量 -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java` -- 增加 `ICEBERG_JDBC = "jdbc"`。 - -### 2) 新增 ExternalCatalog 实现 -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java` -- 参考 `IcebergRestExternalCatalog`。 -- 构造函数中设置 `catalogProperty = new CatalogProperty(resource, props)`。 - -### 3) 新增 Metastore Properties -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java` - -职责: -- 继承 `AbstractIcebergProperties`。 -- 使用 `@ConnectorProperty` 定义字段: - - `uri` - - `warehouse` - - `jdbc.user` - - `jdbc.password` - - `jdbc.init-catalog-tables` - - `jdbc.schema-version` - - `jdbc.strict-mode` -- 实现 `getIcebergCatalogType()` 返回 `ICEBERG_JDBC`。 -- 实现 `initCatalog(...)`: - - 通过 `CatalogUtil.buildIcebergCatalog` 创建实例。 - - 设置 `CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC`。 - - 透传 `warehouse`、`uri` 及所有 `jdbc.*`。 - - 如需 FileIO 参数,复用 `IcebergRestProperties.toFileIOProperties(...)`。 - -### 4) 注册到 Properties Factory -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java` -- 增加 `register("jdbc", IcebergJdbcMetaStoreProperties::new)`。 - -### 5) 注册到 Catalog Factory -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java` -- 增加 `ICEBERG_JDBC` 分支,创建 `IcebergJdbcExternalCatalog`。 - -### 6) 注册到 Gson -文件:`fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java` -- 在 `dsTypeAdapterFactory` 中注册 `IcebergJdbcExternalCatalog`。 - -### 7) 扫描节点支持 -文件:`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java` -- switch 中新增 `ICEBERG_JDBC` 分支。 -- 仍使用 `IcebergApiSource`。 - -### 8) 单测 -新增:`fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java` - -建议断言: -- `CatalogUtil.ICEBERG_CATALOG_TYPE` 为 `jdbc`。 -- `warehouse` 与 `uri` 透传正确。 -- `jdbc.*` 参数在最终 options 中可见。 - -可选集成测试: -- `regression-test/suites/external_table_p0/iceberg` 新增 JDBC 元数据库测试。 -- 使用真实 JDBC 数据库(如 PostgreSQL)。 - -## 示例 SQL - -### 基础示例(PostgreSQL + HDFS) -```sql -CREATE CATALOG iceberg_jdbc PROPERTIES ( - "type" = "iceberg", - "iceberg.catalog.type" = "jdbc", - "uri" = "jdbc:postgresql://host:5432/iceberg", - "warehouse" = "hdfs://namenode:8020/warehouse", - "driver_url" = "postgresql-42.7.3.jar", - "driver_class" = "org.postgresql.Driver", - "jdbc.user" = "iceberg", - "jdbc.password" = "secret", - "jdbc.init-catalog-tables" = "true" -); -``` - -### S3 存储示例(PostgreSQL + MinIO) -```sql -CREATE CATALOG iceberg_jdbc_s3 PROPERTIES ( - "type" = "iceberg", - "iceberg.catalog.type" = "jdbc", - "uri" = "jdbc:postgresql://localhost:5432/iceberg_db", - "warehouse" = "s3://iceberg-bucket/warehouse", - "driver_url" = "postgresql-42.7.3.jar", - "driver_class" = "org.postgresql.Driver", - "jdbc.user" = "iceberg_user", - "jdbc.password" = "Iceberg123", - "jdbc.init-catalog-tables" = "true", - -- S3/MinIO 存储配置 - "s3.endpoint" = "http://minio:9000", - "s3.access_key" = "minioadmin", - "s3.secret_key" = "minioadmin", - "s3.region" = "us-east-1" -); -``` - -### MySQL 示例 -```sql -CREATE CATALOG iceberg_jdbc_mysql PROPERTIES ( - "type" = "iceberg", - "iceberg.catalog.type" = "jdbc", - "uri" = "jdbc:mysql://host:3306/iceberg?useSSL=false", - "warehouse" = "s3://bucket/warehouse", - "driver_url" = "mysql-connector-j-8.0.33.jar", - "driver_class" = "com.mysql.cj.jdbc.Driver", - "jdbc.user" = "iceberg", - "jdbc.password" = "secret", - "jdbc.init-catalog-tables" = "true", - -- S3 存储配置 - "s3.endpoint" = "https://s3.amazonaws.com", - "s3.access_key" = "", - "s3.secret_key" = "", - "s3.region" = "us-east-1" -); -``` - -### SQLite 示例(本地测试) -```sql -CREATE CATALOG iceberg_jdbc_sqlite PROPERTIES ( - "type" = "iceberg", - "iceberg.catalog.type" = "jdbc", - "uri" = "jdbc:sqlite:/path/to/iceberg_catalog.db", - "warehouse" = "file:///tmp/iceberg-warehouse", - "driver_url" = "sqlite-jdbc-3.44.1.0.jar", - "driver_class" = "org.sqlite.JDBC" -); -``` - -## 驱动要求 -Iceberg `JdbcCatalog` 在 FE 侧通过 JDBC 访问元数据数据库。 -驱动加载方式与 Doris JDBC Catalog 保持一致,避免将驱动放入 `fe/lib` 导致依赖冲突。 - -### 驱动存放位置 -将 JDBC 驱动 JAR 包放入以下任一目录: -- `$DORIS_HOME/jdbc_drivers/`(旧版默认) -- `$DORIS_HOME/plugins/jdbc_drivers/`(新版默认) - -### driver_url 支持的格式 -- 相对路径:`postgresql-42.7.3.jar`(从 `jdbc_drivers` 目录加载) -- 绝对路径:`file:///path/to/postgresql-42.7.3.jar` -- 远程 URL:`http://host/postgresql-42.7.3.jar` - -### 常用驱动类名 -| 数据库 | driver_class | -|--------|-------------| -| PostgreSQL | `org.postgresql.Driver` | -| MySQL | `com.mysql.cj.jdbc.Driver` | -| SQLite | `org.sqlite.JDBC` | - -## 验证清单 -- `SHOW CATALOGS` 能看到新 catalog。 -- `SHOW DATABASES FROM iceberg_jdbc`。 -- `SHOW TABLES FROM iceberg_jdbc.`。 -- `SELECT * FROM iceberg_jdbc..
LIMIT 1`。 -- `INSERT INTO iceberg_jdbc..
VALUES (...)` (写入测试)。 - -## 功能支持 - -### 查询操作 -- 基础查询:`SELECT * FROM table` -- 时间旅行:`FOR TIME AS OF` / `FOR VERSION AS OF` -- Branch 和 Tag 查询 -- 系统表查询:`$snapshots`、`$history`、`$files` 等 - -### 写入操作 -- `INSERT INTO` -- `INSERT OVERWRITE` -- `CTAS`(Create Table As Select) - -### 库表管理 -- `CREATE DATABASE` -- `DROP DATABASE` -- `CREATE TABLE` -- `DROP TABLE` -- Schema 变更(`ALTER TABLE`) -- Partition Evolution - -### 表操作 -- `rewrite_data_files`(小文件合并) -- `rollback_to_snapshot` -- `rollback_to_timestamp` -- Branch & Tag 管理 - -## 与其他 Iceberg Catalog 类型的对比 - -| 功能 | HMS | REST | JDBC | Hadoop | -|-----|-----|------|------|--------| -| 元数据存储 | Hive Metastore | REST 服务 | JDBC 数据库 | 文件系统 | -| 并发控制 | HMS 锁机制 | 服务端控制 | 数据库事务 | 文件锁(受限) | -| 部署复杂度 | 需要 HMS 集群 | 需要 REST 服务 | 需要数据库 | 最简单 | -| 适用场景 | 已有 Hive 环境 | 云原生/多租户 | 独立部署 | 开发测试 | - -## 兼容性说明 -- Doris 当前 Iceberg 版本见 `fe/pom.xml`(1.9.1)。 -- Iceberg 属性名如有变化,需要同步更新 `IcebergJdbcMetaStoreProperties`。 -- JDBC Catalog 支持的数据库:PostgreSQL、MySQL、SQLite(测试用)。 - -## 注意事项 - -1. **驱动加载**:JDBC 驱动必须放在正确的目录,且 `driver_class` 必须与驱动版本匹配。 -2. **数据库初始化**:首次使用时建议设置 `jdbc.init-catalog-tables=true`,Iceberg 会自动创建元数据表。 -3. **敏感信息**:`jdbc.password` 会在 `SHOW CREATE CATALOG` 中脱敏显示。 -4. **存储配置**:JDBC Catalog 只管理元数据,数据文件存储在 `warehouse` 指定的位置,需要配置相应的存储属性。 -5. **并发控制**:JDBC Catalog 使用数据库事务进行并发控制,确保数据库支持事务(如 PostgreSQL、MySQL InnoDB)。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java index 1e252357319c02..d539720efdb703 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -167,6 +167,9 @@ private static void toFileIOProperties(List storageProperties private static void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, Map options) { + // Set S3FileIO as the FileIO implementation for S3-compatible storage + options.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); + if (StringUtils.isNotBlank(s3Properties.getEndpoint())) { options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint()); } From 99a2e0b54616a453b8a23eff4e5ba5fa3b8864b7 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Wed, 31 Dec 2025 11:53:30 +0800 Subject: [PATCH 3/7] fix S3 --- .../property/metastore/IcebergJdbcMetaStoreProperties.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java index d539720efdb703..9c7ba4de1486a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -159,7 +159,8 @@ private static void toFileIOProperties(List storageProperties for (StorageProperties storageProperties : storagePropertiesList) { if (storageProperties instanceof AbstractS3CompatibleProperties) { toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties); - } else if (storageProperties.getHadoopStorageConfig() != null) { + } + if (storageProperties.getHadoopStorageConfig() != null) { conf.addResource(storageProperties.getHadoopStorageConfig()); } } @@ -167,9 +168,6 @@ private static void toFileIOProperties(List storageProperties private static void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, Map options) { - // Set S3FileIO as the FileIO implementation for S3-compatible storage - options.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); - if (StringUtils.isNotBlank(s3Properties.getEndpoint())) { options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint()); } From 17aefc7dcb6ed0728c92a1685c601d499aff4cc6 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Wed, 31 Dec 2025 15:13:36 +0800 Subject: [PATCH 4/7] add driver_url and driver_class --- .../IcebergJdbcMetaStoreProperties.java | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java index 9c7ba4de1486a3..5f1912163f9f35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.property.metastore; +import org.apache.doris.catalog.JdbcResource; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.property.ConnectorProperty; import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; @@ -30,15 +31,23 @@ import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.catalog.Catalog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { + private static final Logger LOG = LogManager.getLogger(IcebergJdbcMetaStoreProperties.class); private static final String JDBC_PREFIX = "jdbc."; + private static final Map DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); private Map icebergJdbcCatalogProperties; @@ -85,6 +94,22 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { ) private String jdbcStrictMode; + @ConnectorProperty( + names = {"driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"driver_class"}, + required = false, + description = "JDBC driver class name. If not specified, will be auto-detected from the JDBC URI." + ) + private String driverClass; + public IcebergJdbcMetaStoreProperties(Map props) { super(props); } @@ -117,9 +142,104 @@ public Catalog initCatalog(String catalogName, Map catalogProps, Map options = Maps.newHashMap(getIcebergJdbcCatalogProperties()); options.putAll(fileIOProperties); + + // Support dynamic JDBC driver loading + // We need to register the driver with DriverManager because Iceberg uses DriverManager.getConnection() + // which doesn't respect Thread.contextClassLoader + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver from: {}", driverUrl); + } return CatalogUtil.buildIcebergCatalog(catalogName, options, conf); } + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader, + * it uses the caller's ClassLoader. By registering the driver, DriverManager can find it. + * + * @param driverUrl Path or URL to the JDBC driver JAR + * @param driverClassName Driver class name to register + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl); + URL url = new URL(fullDriverUrl); + + ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> { + ClassLoader parent = getClass().getClassLoader(); + return URLClassLoader.newInstance(new URL[]{u}, parent); + }); + + if (StringUtils.isBlank(driverClassName)) { + throw new IllegalArgumentException("driver_class is required when driver_url is specified"); + } + + // Load the driver class and register it with DriverManager + Class driverClass = Class.forName(driverClassName, true, classLoader); + java.sql.Driver driver = (java.sql.Driver) driverClass.getDeclaredConstructor().newInstance(); + + // Wrap with a shim driver because DriverManager refuses to use a driver not loaded by system classloader + java.sql.DriverManager.registerDriver(new DriverShim(driver)); + LOG.info("Successfully registered JDBC driver: {} from {}", driverClassName, fullDriverUrl); + + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid driver URL: " + driverUrl, e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Failed to load JDBC driver class: " + driverClassName, e); + } catch (Exception e) { + throw new RuntimeException("Failed to register JDBC driver: " + driverClassName, e); + } + } + + /** + * A shim driver that wraps the actual driver loaded from a custom ClassLoader. + * This is needed because DriverManager refuses to use a driver that wasn't loaded by the system classloader. + */ + private static class DriverShim implements java.sql.Driver { + private final java.sql.Driver delegate; + + DriverShim(java.sql.Driver delegate) { + this.delegate = delegate; + } + + @Override + public java.sql.Connection connect(String url, java.util.Properties info) throws java.sql.SQLException { + return delegate.connect(url, info); + } + + @Override + public boolean acceptsURL(String url) throws java.sql.SQLException { + return delegate.acceptsURL(url); + } + + @Override + public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, java.util.Properties info) + throws java.sql.SQLException { + return delegate.getPropertyInfo(url, info); + } + + @Override + public int getMajorVersion() { + return delegate.getMajorVersion(); + } + + @Override + public int getMinorVersion() { + return delegate.getMinorVersion(); + } + + @Override + public boolean jdbcCompliant() { + return delegate.jdbcCompliant(); + } + + @Override + public java.util.logging.Logger getParentLogger() throws java.sql.SQLFeatureNotSupportedException { + return delegate.getParentLogger(); + } + } + public Map getIcebergJdbcCatalogProperties() { return Collections.unmodifiableMap(icebergJdbcCatalogProperties); } From 396b7266cbf34ff0368846468f59a60ef4cae239 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Sun, 4 Jan 2026 17:08:34 +0800 Subject: [PATCH 5/7] add regression-test --- .../iceberg/test_iceberg_jdbc_catalog.out | 42 +++ .../iceberg/test_iceberg_jdbc_catalog.groovy | 316 ++++++++++++++++++ 2 files changed, 358 insertions(+) create mode 100644 regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out create mode 100644 regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out new file mode 100644 index 00000000000000..9e7a05f3757d72 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !datatypes_select -- +false 2 200000000000 2.5 3.5 234.56 world 2025-01-02 2025-01-02T11:00 +true 1 100000000000 1.5 2.5 123.45 hello 2025-01-01 2025-01-01T10:00 +true 3 300000000000 3.5 4.5 345.67 test 2025-01-03 2025-01-03T12:00 + +-- !datatypes_count -- +3 + +-- !datatypes_filter -- +1 hello +3 test + +-- !partition_select -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +3 Item3 B 2025-01-02 +4 Item4 B 2025-01-02 +5 Item5 A 2025-01-03 + +-- !partition_filter -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +5 Item5 A 2025-01-03 + +-- !sys_snapshots -- +1 + +-- !sys_history -- +1 + +-- !after_overwrite -- +1 Item1 A 2025-01-01 +2 Item2 A 2025-01-01 +3 Item3 B 2025-01-02 +4 Item4 B 2025-01-02 +5 Item5 A 2025-01-03 + +-- !mysql_select -- +1 Alice 2025-01-01T10:00 +2 Bob 2025-01-02T11:00 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy new file mode 100644 index 00000000000000..647c18f44b78f7 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external_docker_iceberg") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Iceberg test is not enabled, skip this test") + return; + } + + String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest") + if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) { + logger.info("Iceberg JDBC catalog test requires enableJdbcTest, skip this test") + return; + } + + // Get test environment configuration + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String jdbc_port = context.config.otherConfigs.get("pg_14_port") + + // JDBC Catalog specific test - uses PostgreSQL as the metadata store + // If PostgreSQL port is not configured, this test will be skipped + if (jdbc_port == null || jdbc_port.isEmpty()) { + logger.info("Iceberg JDBC catalog PostgreSQL port not configured (pg_14_port), skip this test") + return; + } + + if (minio_port == null || minio_port.isEmpty() || externalEnvIp == null) { + logger.info("Iceberg test environment not fully configured, skip this test") + return; + } + + String catalog_name = "test_iceberg_jdbc_catalog" + String db_name = "jdbc_test_db" + String driver_name = "postgresql-42.5.0.jar" + String driver_download_url = "${getS3Url()}/regression/jdbc_driver/${driver_name}" + String jdbc_drivers_dir = getFeConfig("jdbc_drivers_dir") + String local_driver_dir = "${context.config.dataPath}/jdbc_driver" + String local_driver_path = "${local_driver_dir}/${driver_name}" + String pg_db = "postgres" + String mysql_db = "iceberg_db" + + // MySQL driver config + String mysql_driver_name = "mysql-connector-java-5.1.49-v2.jar" + String mysql_driver_download_url = "${getS3Url()}/regression/jdbc_driver/mysql-connector-java-5.1.49.jar" + String local_mysql_driver_path = "${local_driver_dir}/${mysql_driver_name}" + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + logger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + if (exitcode != 0) { + logger.info("exit code: ${exitcode}, output\n: ${proc.text}") + if (mustSuc == true) { + assertTrue(false, "Execute failed: ${cmd}") + } + } + } catch (IOException e) { + assertTrue(false, "Execute timeout: ${cmd}") + } + } + + // Ensure the PostgreSQL JDBC driver is available on all FE/BE nodes. + def host_ips = new ArrayList() + String[][] backends = sql """ show backends """ + for (def b in backends) { + host_ips.add(b[1]) + } + String[][] frontends = sql """ show frontends """ + for (def f in frontends) { + host_ips.add(f[1]) + } + host_ips = host_ips.unique() + + executeCommand("mkdir -p ${local_driver_dir}", false) + if (!new File(local_driver_path).exists()) { + executeCommand("/usr/bin/curl --max-time 600 ${driver_download_url} --output ${local_driver_path}", true) + } + if (!new File(local_mysql_driver_path).exists()) { + executeCommand("/usr/bin/curl --max-time 600 ${mysql_driver_download_url} --output ${local_mysql_driver_path}", true) + } + for (def ip in host_ips) { + executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p ${jdbc_drivers_dir}\"", false) + scpFiles("root", ip, local_driver_path, jdbc_drivers_dir, false) + scpFiles("root", ip, local_mysql_driver_path, jdbc_drivers_dir, false) + } + + try { + // Clean up existing catalog + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + + // Create Iceberg JDBC Catalog with PostgreSQL backend and MinIO storage + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'jdbc', + 'uri' = 'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}', + 'warehouse' = 's3://warehouse/jdbc_wh/', + 'driver_url' = '${driver_name}', + 'driver_class' = 'org.postgresql.Driver', + 'jdbc.user' = 'postgres', + 'jdbc.password' = '123456', + 'jdbc.init-catalog-tables' = 'true', + 'jdbc.schema-version' = 'V1', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.region' = 'us-east-1' + ) + """ + + // Switch to the catalog + sql """SWITCH ${catalog_name}""" + + // Test: Show catalogs + def catalogs = sql """SHOW CATALOGS""" + assertTrue(catalogs.toString().contains(catalog_name)) + + // Test: Create database + sql """DROP DATABASE IF EXISTS ${db_name} FORCE""" + sql """CREATE DATABASE ${db_name}""" + + def databases = sql """SHOW DATABASES""" + assertTrue(databases.toString().contains(db_name)) + + sql """USE ${db_name}""" + + // Test: Create non-partitioned table with various data types + sql """DROP TABLE IF EXISTS test_datatypes""" + sql """ + CREATE TABLE test_datatypes ( + c_boolean BOOLEAN, + c_int INT, + c_bigint BIGINT, + c_float FLOAT, + c_double DOUBLE, + c_decimal DECIMAL(10, 2), + c_string STRING, + c_date DATE, + c_datetime DATETIME + ) PROPERTIES ( + 'write-format' = 'parquet', + 'compression-codec' = 'zstd' + ) + """ + + def tables = sql """SHOW TABLES""" + assertTrue(tables.toString().contains("test_datatypes")) + + // Test: Insert data with various types + sql """ + INSERT INTO test_datatypes VALUES + (true, 1, 100000000000, 1.5, 2.5, 123.45, 'hello', '2025-01-01', '2025-01-01 10:00:00'), + (false, 2, 200000000000, 2.5, 3.5, 234.56, 'world', '2025-01-02', '2025-01-02 11:00:00'), + (true, 3, 300000000000, 3.5, 4.5, 345.67, 'test', '2025-01-03', '2025-01-03 12:00:00') + """ + + // Test: Query data with different data types + order_qt_datatypes_select """SELECT * FROM test_datatypes ORDER BY c_int""" + order_qt_datatypes_count """SELECT count(*) FROM test_datatypes""" + order_qt_datatypes_filter """SELECT c_int, c_string FROM test_datatypes WHERE c_boolean = true ORDER BY c_int""" + + // Test: Create partitioned table + sql """DROP TABLE IF EXISTS test_partitioned""" + sql """ + CREATE TABLE test_partitioned ( + id INT, + name STRING, + category STRING, + event_date DATE + ) + PARTITION BY LIST (category) () + PROPERTIES ( + 'write-format' = 'parquet' + ) + """ + + // Test: Insert into partitioned table + sql """ + INSERT INTO test_partitioned VALUES + (1, 'Item1', 'A', '2025-01-01'), + (2, 'Item2', 'A', '2025-01-01'), + (3, 'Item3', 'B', '2025-01-02'), + (4, 'Item4', 'B', '2025-01-02'), + (5, 'Item5', 'A', '2025-01-03') + """ + + order_qt_partition_select """SELECT * FROM test_partitioned ORDER BY id""" + order_qt_partition_filter """SELECT * FROM test_partitioned WHERE category = 'A' ORDER BY id""" + + // Test: System tables + order_qt_sys_snapshots """SELECT count(*) FROM test_datatypes\$snapshots""" + order_qt_sys_history """SELECT count(*) FROM test_datatypes\$history""" + + // Test: DESCRIBE TABLE + def desc = sql """DESCRIBE test_datatypes""" + assertTrue(desc.toString().contains("c_int")) + assertTrue(desc.toString().contains("c_string")) + + // Test: INSERT OVERWRITE + sql """ + INSERT OVERWRITE TABLE test_partitioned + SELECT * FROM test_partitioned WHERE category = 'A' + """ + order_qt_after_overwrite """SELECT * FROM test_partitioned ORDER BY id""" + + // Test: Drop table + sql """DROP TABLE IF EXISTS test_datatypes""" + sql """DROP TABLE IF EXISTS test_partitioned""" + + // Test: Drop database + sql """DROP DATABASE IF EXISTS ${db_name} FORCE""" + + logger.info("Iceberg JDBC Catalog test completed successfully") + + // MySQL Catalog Test + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + if (mysql_port != null) { + // Clean up MySQL database to remove old metadata + // This prevents issues where the database contains metadata pointing to invalid S3 locations + String cleanupCmd = "mysql -h ${externalEnvIp} -P ${mysql_port} -u root -p123456 -e 'DROP DATABASE IF EXISTS iceberg_db; CREATE DATABASE iceberg_db;'" + executeCommand(cleanupCmd, false) + + String mysql_catalog_name = "iceberg_jdbc_mysql" + try { + sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}""" + sql """ + CREATE CATALOG ${mysql_catalog_name} PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'jdbc', + 'uri' = 'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}', + 'warehouse' = 's3://warehouse/jdbc_wh_mysql/', + 'driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', + 'driver_class' = 'com.mysql.jdbc.Driver', + 'jdbc.user' = 'root', + 'jdbc.password' = '123456', + 'jdbc.init-catalog-tables' = 'true', + 'jdbc.schema-version' = 'V1', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.region' = 'us-east-1' + ) + """ + + sql """SWITCH ${mysql_catalog_name}""" + + String mysql_db_name = "mysql_test_db" + sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE""" + sql """CREATE DATABASE ${mysql_db_name}""" + sql """USE ${mysql_db_name}""" + + sql """DROP TABLE IF EXISTS test_mysql_catalog""" + sql """ + CREATE TABLE test_mysql_catalog ( + id INT, + name STRING, + ts DATETIME + ) PROPERTIES ( + 'write-format' = 'parquet' + ) + """ + + sql """ + INSERT INTO test_mysql_catalog VALUES + (1, 'Alice', '2025-01-01 10:00:00'), + (2, 'Bob', '2025-01-02 11:00:00') + """ + + order_qt_mysql_select """SELECT * FROM test_mysql_catalog ORDER BY id""" + + sql """DROP TABLE IF EXISTS test_mysql_catalog""" + sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE""" + + logger.info("Iceberg JDBC Catalog (MySQL) test completed successfully") + } catch (Exception e) { + logger.warn("MySQL Catalog test failed: ${e.message}") + // Don't fail the whole suite if MySQL is optional or misconfigured + // But user asked for it, so maybe we should let it fail or log error + throw e + } finally { + try { + sql """SWITCH internal""" + sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}""" + } catch (Exception e) { + logger.warn("Failed to cleanup MySQL catalog: ${e.message}") + } + } + } + + } finally { + // Cleanup + try { + sql """SWITCH internal""" + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + } catch (Exception e) { + logger.warn("Failed to cleanup catalog: ${e.message}") + } + } +} From 2f8de368de85f7b8012ea6bf5de241fc66c7f5f7 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Sun, 4 Jan 2026 23:02:54 +0800 Subject: [PATCH 6/7] fix --- .../IcebergJdbcMetaStoreProperties.java | 14 +++++------ .../iceberg/test_iceberg_jdbc_catalog.groovy | 24 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java index 5f1912163f9f35..8406ec15373979 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -59,14 +59,14 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { private String uri = ""; @ConnectorProperty( - names = {"jdbc.user"}, + names = {"iceberg.jdbc.user"}, required = false, description = "Username for the Iceberg JDBC catalog." ) private String jdbcUser; @ConnectorProperty( - names = {"jdbc.password"}, + names = {"iceberg.jdbc.password"}, required = false, sensitive = true, description = "Password for the Iceberg JDBC catalog." @@ -74,28 +74,28 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { private String jdbcPassword; @ConnectorProperty( - names = {"jdbc.init-catalog-tables"}, + names = {"iceberg.jdbc.init-catalog-tables"}, required = false, description = "Whether to create catalog tables if they do not exist." ) private String jdbcInitCatalogTables; @ConnectorProperty( - names = {"jdbc.schema-version"}, + names = {"iceberg.jdbc.schema-version"}, required = false, description = "Iceberg JDBC catalog schema version (V0/V1)." ) private String jdbcSchemaVersion; @ConnectorProperty( - names = {"jdbc.strict-mode"}, + names = {"iceberg.jdbc.strict-mode"}, required = false, description = "Whether to enforce strict JDBC catalog schema checks." ) private String jdbcStrictMode; @ConnectorProperty( - names = {"driver_url"}, + names = {"iceberg.driver_url"}, required = false, description = "JDBC driver JAR file path or URL. " + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " @@ -104,7 +104,7 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { private String driverUrl; @ConnectorProperty( - names = {"driver_class"}, + names = {"iceberg.driver_class"}, required = false, description = "JDBC driver class name. If not specified, will be auto-detected from the JDBC URI." ) diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy index 647c18f44b78f7..4fcfcafc60e257 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy @@ -112,12 +112,12 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external 'iceberg.catalog.type' = 'jdbc', 'uri' = 'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}', 'warehouse' = 's3://warehouse/jdbc_wh/', - 'driver_url' = '${driver_name}', - 'driver_class' = 'org.postgresql.Driver', - 'jdbc.user' = 'postgres', - 'jdbc.password' = '123456', - 'jdbc.init-catalog-tables' = 'true', - 'jdbc.schema-version' = 'V1', + 'iceberg.driver_url' = '${driver_name}', + 'iceberg.driver_class' = 'org.postgresql.Driver', + 'iceberg.jdbc.user' = 'postgres', + 'iceberg.jdbc.password' = '123456', + 'iceberg.jdbc.init-catalog-tables' = 'true', + 'iceberg.jdbc.schema-version' = 'V1', 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', 's3.access_key' = 'admin', 's3.secret_key' = 'password', @@ -246,12 +246,12 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external 'iceberg.catalog.type' = 'jdbc', 'uri' = 'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}', 'warehouse' = 's3://warehouse/jdbc_wh_mysql/', - 'driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', - 'driver_class' = 'com.mysql.jdbc.Driver', - 'jdbc.user' = 'root', - 'jdbc.password' = '123456', - 'jdbc.init-catalog-tables' = 'true', - 'jdbc.schema-version' = 'V1', + 'iceberg.driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', + 'iceberg.driver_class' = 'com.mysql.jdbc.Driver', + 'iceberg.jdbc.user' = 'root', + 'iceberg.jdbc.password' = '123456', + 'iceberg.jdbc.init-catalog-tables' = 'true', + 'iceberg.jdbc.schema-version' = 'V1', 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', 's3.access_key' = 'admin', 's3.secret_key' = 'password', From a90fd153e66ae57ab8de07bcc99448305d648647 Mon Sep 17 00:00:00 2001 From: xylaaaaa <2392805527@qq.com> Date: Mon, 5 Jan 2026 16:50:56 +0800 Subject: [PATCH 7/7] fix --- .../metastore/IcebergJdbcMetaStoreProperties.java | 4 ++-- .../iceberg/test_iceberg_jdbc_catalog.groovy | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java index 8406ec15373979..5c81532edd453e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java @@ -95,7 +95,7 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { private String jdbcStrictMode; @ConnectorProperty( - names = {"iceberg.driver_url"}, + names = {"iceberg.jdbc.driver_url"}, required = false, description = "JDBC driver JAR file path or URL. " + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " @@ -104,7 +104,7 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties { private String driverUrl; @ConnectorProperty( - names = {"iceberg.driver_class"}, + names = {"iceberg.jdbc.driver_class"}, required = false, description = "JDBC driver class name. If not specified, will be auto-detected from the JDBC URI." ) diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy index 4fcfcafc60e257..412d305da1cdc5 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy @@ -112,8 +112,8 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external 'iceberg.catalog.type' = 'jdbc', 'uri' = 'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}', 'warehouse' = 's3://warehouse/jdbc_wh/', - 'iceberg.driver_url' = '${driver_name}', - 'iceberg.driver_class' = 'org.postgresql.Driver', + 'iceberg.jdbc.driver_url' = '${driver_name}', + 'iceberg.jdbc.driver_class' = 'org.postgresql.Driver', 'iceberg.jdbc.user' = 'postgres', 'iceberg.jdbc.password' = '123456', 'iceberg.jdbc.init-catalog-tables' = 'true', @@ -246,8 +246,8 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external 'iceberg.catalog.type' = 'jdbc', 'uri' = 'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}', 'warehouse' = 's3://warehouse/jdbc_wh_mysql/', - 'iceberg.driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', - 'iceberg.driver_class' = 'com.mysql.jdbc.Driver', + 'iceberg.jdbc.driver_url' = 'file://${jdbc_drivers_dir}/${mysql_driver_name}', + 'iceberg.jdbc.driver_class' = 'com.mysql.jdbc.Driver', 'iceberg.jdbc.user' = 'root', 'iceberg.jdbc.password' = '123456', 'iceberg.jdbc.init-catalog-tables' = 'true',