Skip to content

Commit b99eb21

Browse files
Support per-table schema registry with authentication
Add an extra option for tables with the Kafka engine to pass the URL of a schema registry. Also add support for schema registry basic authentication via username:password in the URL. Changes: - Add kafka_format_avro_schema_registry_url setting to KafkaSettings - Store format_avro_schema_registry_url in StorageKafka class - Propagate schema registry URL to format settings via createSettingsAdjustments() (works for both StorageKafka and StorageKafka2) - Add basic authentication parsing in AvroRowInputFormat::SchemaRegistry to extract username:password from URL and authenticate HTTP requests This allows per-table configuration of Avro Schema Registry URLs with authentication, enabling different Kafka tables to use different schema registries or credentials. Usage example: CREATE TABLE kafka_table (...) ENGINE = Kafka(...) SETTINGS kafka_format_avro_schema_registry_url = 'http://user:pass@registry:8081'; Co-authored-by: Kevin Michel <kevin.michel@aiven.io>
1 parent 44983c1 commit b99eb21

File tree

5 files changed

+25
-0
lines changed

5 files changed

+25
-0
lines changed

src/Processors/Formats/Impl/AvroRowInputFormat.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,21 @@ class AvroConfluentRowInputFormat::SchemaRegistry
10831083
.withReceiveTimeout(1);
10841084

10851085
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
1086+
const auto & user_info = base_url.getUserInfo();
1087+
if (!user_info.empty())
1088+
{
1089+
std::size_t n = user_info.find(':');
1090+
if (n != std::string::npos)
1091+
{
1092+
Poco::Net::HTTPBasicCredentials credentials;
1093+
credentials.setUsername(user_info.substr(0, n));
1094+
credentials.setPassword(user_info.substr(n + 1));
1095+
if (!credentials.getUsername().empty())
1096+
{
1097+
credentials.authenticate(request);
1098+
}
1099+
}
1100+
}
10861101
if (url.getPort())
10871102
request.setHost(url.getHost(), url.getPort());
10881103
else

src/Storages/Kafka/KafkaSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace ErrorCodes
2424
DECLARE(String, kafka_group_name, "", "Client group id string. All Kafka consumers sharing the same group.id belong to the same group.", 0) \
2525
/* those are mapped to format factory settings */ \
2626
DECLARE(String, kafka_format, "", "The message format for Kafka engine.", 0) \
27+
DECLARE(String, kafka_format_avro_schema_registry_url, "", "For AvroConfluent format: Kafka Schema Registry URL.", 0) \
2728
DECLARE(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
2829
DECLARE(UInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \
2930
/* default is = max_insert_block_size / kafka_num_consumers */ \

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace KafkaSetting
9898
extern const KafkaSettingsString kafka_ssl_key_location;
9999
extern const KafkaSettingsBool kafka_thread_per_consumer;
100100
extern const KafkaSettingsString kafka_topic_list;
101+
extern const KafkaSettingsString kafka_format_avro_schema_registry_url;
101102
}
102103

103104
namespace ErrorCodes
@@ -188,6 +189,7 @@ StorageKafka::StorageKafka(
188189
? StorageKafkaUtils::getDefaultClientId(table_id_)
189190
: getContext()->getMacros()->expand((*kafka_settings)[KafkaSetting::kafka_client_id].value, macros_info))
190191
, format_name(getContext()->getMacros()->expand((*kafka_settings)[KafkaSetting::kafka_format].value))
192+
, format_avro_schema_registry_url((*kafka_settings)[KafkaSetting::kafka_format_avro_schema_registry_url].value)
191193
, max_rows_per_message((*kafka_settings)[KafkaSetting::kafka_max_rows_per_message].value)
192194
, schema_name(getContext()->getMacros()->expand((*kafka_settings)[KafkaSetting::kafka_schema].value, macros_info))
193195
, num_consumers((*kafka_settings)[KafkaSetting::kafka_num_consumers].value)

src/Storages/Kafka/StorageKafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class StorageKafka final : public IStorage, WithContext
107107
const String group;
108108
const String client_id;
109109
const String format_name;
110+
const String format_avro_schema_registry_url;
110111
const size_t max_rows_per_message;
111112
const String schema_name;
112113
const size_t num_consumers; /// total number of consumers

src/Storages/Kafka/StorageKafkaUtils.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ namespace KafkaSetting
8282
extern const KafkaSettingsUInt64 kafka_skip_broken_messages;
8383
extern const KafkaSettingsBool kafka_thread_per_consumer;
8484
extern const KafkaSettingsString kafka_topic_list;
85+
extern const KafkaSettingsString kafka_format_avro_schema_registry_url;
8586
}
8687

8788
using namespace std::chrono_literals;
@@ -520,6 +521,11 @@ SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const
520521
result.setSetting("input_format_csv_detect_header", false);
521522
result.setSetting("input_format_tsv_detect_header", false);
522523
result.setSetting("input_format_custom_detect_header", false);
524+
const String & format_avro_schema_registry_url = kafka_settings[KafkaSetting::kafka_format_avro_schema_registry_url].value;
525+
if (!format_avro_schema_registry_url.empty())
526+
{
527+
result.emplace_back("format_avro_schema_registry_url", format_avro_schema_registry_url);
528+
}
523529

524530
return result;
525531
}

0 commit comments

Comments
 (0)