diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index 63509126022b..444543d200a9 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.io.solace.data.Solace.Record; import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; +import org.apache.beam.sdk.io.solace.read.AckMessageDoFn; import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; import org.apache.beam.sdk.io.solace.write.AddShardKeyDoFn; import org.apache.beam.sdk.io.solace.write.SolaceOutput; @@ -48,10 +49,13 @@ import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -412,6 +416,7 @@ public class SolaceIO { } }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; + private static final int DEFAULT_ACK_DEADLINE_SECONDS = 30; private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = Duration.standardSeconds(30); public static final int DEFAULT_WRITER_NUM_SHARDS = 20; @@ -461,6 +466,7 @@ public static Read read() { .setParseFn(SolaceRecordMapper::map) .setTimestampFn(SENDER_TIMESTAMP_FUNCTION) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS) .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } @@ -490,6 +496,7 @@ public static Read read( .setParseFn(parseFn) .setTimestampFn(timestampFn) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS) .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } @@ -587,6 +594,16 @@ public Read withDeduplicateRecords(boolean deduplicateRecords) { return this; } + /** + * Optional, default: 30, max less than 60. Set to ack deadline after which {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader} will start to reject outstanding + * messages that were not successfully checkpointed. + */ + public Read withAckDeadlineSeconds(int ackDeadlineSeconds) { + configurationBuilder.setAckDeadlineSeconds(ackDeadlineSeconds); + return this; + } + /** * Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}. * @@ -689,6 +706,8 @@ abstract static class Configuration { abstract Duration getWatermarkIdleDurationThreshold(); + abstract int getAckDeadlineSeconds(); + public static Builder builder() { Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder(); @@ -719,6 +738,8 @@ abstract Builder setParseFn( abstract Builder setWatermarkIdleDurationThreshold(Duration idleDurationThreshold); + abstract Builder setAckDeadlineSeconds(int seconds); + abstract Configuration build(); } } @@ -745,18 +766,28 @@ public PCollection expand(PBegin input) { Coder coder = inferCoder(input.getPipeline(), configuration.getTypeDescriptor()); - return input.apply( - org.apache.beam.sdk.io.Read.from( - new UnboundedSolaceSource<>( - initializedQueue, - sempClientFactory, - sessionServiceFactory, - configuration.getMaxNumConnections(), - configuration.getDeduplicateRecords(), - coder, - configuration.getTimestampFn(), - configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()))); + PCollection> messages = + input.apply( + org.apache.beam.sdk.io.Read.from( + new UnboundedSolaceSource<>( + initializedQueue, + sempClientFactory, + sessionServiceFactory, + configuration.getMaxNumConnections(), + configuration.getDeduplicateRecords(), + coder, + configuration.getTimestampFn(), + configuration.getWatermarkIdleDurationThreshold(), + configuration.getParseFn(), + configuration.getAckDeadlineSeconds()))); + + messages + .apply("Keys", Keys.create()) + .apply("Reshuffle", Redistribute.arbitrarily()) + .apply( + "Ack", ParDo.of(new AckMessageDoFn(initializedQueue.getName(), sempClientFactory))); + + return messages.apply("Values", Values.create()); } @VisibleForTesting diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java index 0a9ee4618b1e..830bdb7d94b1 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java @@ -73,6 +73,11 @@ public long getBacklogBytes(String queueName) throws IOException { return sempBasicAuthClientExecutor.getBacklogBytes(queueName); } + @Override + public void ack(String queueName, Long msgId) throws IOException { + sempBasicAuthClientExecutor.ack(queueName, msgId); + } + private void createQueue(String queueName) throws IOException { LOG.info("SolaceIO.Read: Creating new queue {}.", queueName); sempBasicAuthClientExecutor.createQueueResponse(queueName); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java index 818368a92b9f..e4bdff7cf29c 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java @@ -29,6 +29,7 @@ import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; import java.io.IOException; import java.util.Objects; @@ -143,6 +144,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); flowProperties.setEndpoint(queue); + flowProperties.addRequiredSettlementOutcomes( + XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED); flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); EndpointProperties endpointProperties = new EndpointProperties(); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java index 965fc8741374..6525fc9c9c2b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java @@ -36,8 +36,6 @@ import java.io.UnsupportedEncodingException; import java.net.CookieManager; import java.net.HttpCookie; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,7 +43,10 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.io.solace.data.Semp.Queue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.UrlEscapers; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class to execute requests to SEMP v2 with Basic Auth authentication. @@ -59,6 +60,7 @@ public class SempBasicAuthClientExecutor implements Serializable { // Every request will be repeated 2 times in case of abnormal connection failures. private static final int REQUEST_NUM_RETRIES = 2; + private static final Logger LOG = LoggerFactory.getLogger(SempBasicAuthClientExecutor.class); private static final Map COOKIE_MANAGER_MAP = new ConcurrentHashMap(); private static final String COOKIES_HEADER = "Set-Cookie"; @@ -102,6 +104,13 @@ private static String getQueueEndpoint(String messageVpn, String queueName) "/monitor/msgVpns/%s/queues/%s", urlEncode(messageVpn), urlEncode(queueName)); } + private static String getAckEndpoint(String messageVpn, String queueName, Long msgId) + throws UnsupportedEncodingException { + return String.format( + "/action/msgVpns/%s/queues/%s/msgs/%d/delete", + urlEncode(messageVpn), urlEncode(queueName), msgId); + } + private static String createQueueEndpoint(String messageVpn) throws UnsupportedEncodingException { return String.format("/config/msgVpns/%s/queues", urlEncode(messageVpn)); } @@ -158,6 +167,13 @@ private HttpResponse executePost(GenericUrl url, ImmutableMap pa return execute(request); } + private HttpResponse executePut(GenericUrl url, ImmutableMap parameters) + throws IOException { + HttpContent content = new JsonHttpContent(GsonFactory.getDefaultInstance(), parameters); + HttpRequest request = requestFactory.buildPutRequest(url, content); + return execute(request); + } + private HttpResponse execute(HttpRequest request) throws IOException { request.setNumberOfRetries(REQUEST_NUM_RETRIES); HttpHeaders httpHeaders = new HttpHeaders(); @@ -210,8 +226,8 @@ private void storeCookiesInCookieManager(HttpHeaders headers) { } } - private static String urlEncode(String queueName) throws UnsupportedEncodingException { - return URLEncoder.encode(queueName, StandardCharsets.UTF_8.name()); + private static String urlEncode(String path) { + return UrlEscapers.urlPathSegmentEscaper().escape(path); } private T mapJsonToClass(String content, Class mapSuccessToClass) @@ -228,6 +244,17 @@ public long getBacklogBytes(String queueName) throws IOException { return q.data().msgSpoolUsage(); } + public void ack(String queueName, Long msgId) throws IOException { + String queryUrl = getAckEndpoint(messageVpn, queueName, msgId); + ImmutableMap params = ImmutableMap.builder().build(); + try { + HttpResponse response = executePut(new GenericUrl(baseUrl + queryUrl), params); + BrokerResponse.fromHttpResponse(response); + } catch (HttpResponseException e) { + LOG.error("Failed to ack message", e); + } + } + private static class CookieManagerKey implements Serializable { private final String baseUrl; private final String username; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java index 465f37c14036..0bb21672d5a2 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java @@ -46,4 +46,6 @@ public interface SempClient extends Serializable { * the amount of data in messages that are waiting to be delivered to consumers. */ long getBacklogBytes(String queueName) throws IOException; + + void ack(String queueName, Long msgId) throws IOException; } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/AckMessageDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/AckMessageDoFn.java new file mode 100644 index 000000000000..73b940d4a277 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/AckMessageDoFn.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import java.io.IOException; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.broker.SempClient; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class AckMessageDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(AckMessageDoFn.class); + private String queueName; + private final SempClientFactory sempClientFactory; + @Nullable SempClient sempClient; + + public AckMessageDoFn(String queueName, SempClientFactory sempClientFactory) { + this.queueName = queueName; + this.sempClientFactory = sempClientFactory; + } + + @StartBundle + public void startBundle() { + sempClient = sempClientFactory.create(); + } + + @Teardown + public void tearDown() { + if (sempClient != null) { + sempClient = null; + } + } + + @ProcessElement + public void processElement(@Element Long msgId) throws IOException { + Preconditions.checkStateNotNull(sempClient).ack(queueName, msgId); + } + + @FinishBundle + public void finishBundle() { + if (sempClient != null) { + sempClient = null; + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index eb2d4b3006a6..48da412f92f2 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,8 +18,8 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; -import java.util.List; import java.util.Objects; +import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -38,7 +38,7 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient List safeToAck; + private transient Queue safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -48,24 +48,12 @@ private SolaceCheckpointMark() {} * * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(List safeToAck) { + SolaceCheckpointMark(Queue safeToAck) { this.safeToAck = safeToAck; } @Override - public void finalizeCheckpoint() { - for (BytesXMLMessage msg : safeToAck) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - } - } - } + public void finalizeCheckpoint() {} @Override public boolean equals(@Nullable Object o) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index 7c756169ef3e..a2afabf5e4f1 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -17,9 +17,13 @@ */ package org.apache.beam.sdk.io.solace.read; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.XMLMessage; +import com.solacesystems.jcsmp.XMLMessage.Outcome; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -27,9 +31,11 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; @@ -37,11 +43,11 @@ import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -49,7 +55,7 @@ /** Unbounded Reader to read messages from a Solace Router. */ @VisibleForTesting -class UnboundedSolaceReader extends UnboundedReader { +class UnboundedSolaceReader extends UnboundedReader> { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceReader.class); private final UnboundedSolaceSource currentSource; @@ -58,7 +64,8 @@ class UnboundedSolaceReader extends UnboundedReader { private final UUID readerUuid; private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; - private @Nullable T solaceMappedRecord; + private @Nullable KV solaceMappedRecord; + private final int ackDeadlineSeconds; /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a @@ -67,7 +74,8 @@ class UnboundedSolaceReader extends UnboundedReader { private final Queue receivedMessages = new ArrayDeque<>(); private static final Cache sessionServiceCache; - private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); + private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(4); + private final ScheduledExecutorService nackExecutorPool; static { Duration cacheExpirationTimeout = Duration.ofMinutes(1); @@ -96,6 +104,7 @@ private static void startCleanUpThread() { } public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { + checkArgument(currentSource.getAckDeadlineSeconds() < 60); this.currentSource = currentSource; this.watermarkPolicy = WatermarkPolicy.create( @@ -103,6 +112,12 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); this.readerUuid = UUID.randomUUID(); + this.ackDeadlineSeconds = currentSource.getAckDeadlineSeconds(); + + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true); + executor.setRemoveOnCancelPolicy(true); + this.nackExecutorPool = executor; } private SessionService getSessionService() { @@ -142,17 +157,50 @@ public boolean advance() { return false; } solaceOriginalRecord = receivedXmlMessage; - solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); + T apply = getCurrentSource().getParseFn().apply(receivedXmlMessage); + if (apply == null) { + try { + receivedXmlMessage.settle(Outcome.REJECTED); + } catch (JCSMPException e) { + LOG.warn("SolaceIO.Read: Exception when rejecting null message.", e); + } + } else { + long msgId = receivedXmlMessage.getMessageIdLong(); + solaceMappedRecord = KV.of(msgId, apply); + receivedMessages.add(receivedXmlMessage); + } return true; } @Override public void close() { + nackExecutorPool.shutdown(); + try { + if (!nackExecutorPool.awaitTermination(ackDeadlineSeconds * 2, TimeUnit.SECONDS)) { + nackExecutorPool.shutdownNow(); + } + } catch (InterruptedException e) { + nackExecutorPool.shutdownNow(); + } sessionServiceCache.invalidate(readerUuid); } + public void nackMessages(Queue checkpoint) { + BytesXMLMessage msg; + while ((msg = checkpoint.poll()) != null) { + try { + msg.settle(XMLMessage.Outcome.FAILED); + } catch (IllegalStateException | JCSMPException e) { + LOG.error( + "SolaceIO.Read: failed to nack the message with applicationMessageId={}, ackMessageId={}.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + } + } + } + @Override public Instant getWatermark() { // should be only used by a test receiver @@ -164,14 +212,14 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - - ImmutableList bytesXMLMessages = ImmutableList.copyOf(receivedMessages); + Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + safeToAckMessages.addAll(receivedMessages); receivedMessages.clear(); - return new SolaceCheckpointMark(bytesXMLMessages); + return new SolaceCheckpointMark(safeToAckMessages); } @Override - public T getCurrent() throws NoSuchElementException { + public KV getCurrent() throws NoSuchElementException { if (solaceMappedRecord == null) { throw new NoSuchElementException(); } @@ -205,7 +253,7 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { if (getCurrent() == null) { throw new NoSuchElementException(); } - return currentSource.getTimestampFn().apply(getCurrent()); + return currentSource.getTimestampFn().apply(getCurrent().getValue()); } @Override diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java index 1cb17a49fbdb..8af754729130 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -24,12 +24,15 @@ import java.util.List; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.solace.broker.SempClientFactory; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory; @Internal -public class UnboundedSolaceSource extends UnboundedSource { +public class UnboundedSolaceSource extends UnboundedSource, SolaceCheckpointMark> { private static final long serialVersionUID = 42L; private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceSource.class); private final Queue queue; @@ -49,6 +52,7 @@ public class UnboundedSolaceSource extends UnboundedSource timestampFn; private final Duration watermarkIdleDurationThreshold; private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn; + private final int ackDeadlineSeconds; public Queue getQueue() { return queue; @@ -70,6 +74,10 @@ public Duration getWatermarkIdleDurationThreshold() { return watermarkIdleDurationThreshold; } + public int getAckDeadlineSeconds() { + return ackDeadlineSeconds; + } + public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() { return parseFn; } @@ -83,7 +91,8 @@ public UnboundedSolaceSource( Coder coder, SerializableFunction timestampFn, Duration watermarkIdleDurationThreshold, - SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) { + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, + int ackDeadlineSeconds) { this.queue = queue; this.sempClientFactory = sempClientFactory; this.sessionServiceFactory = sessionServiceFactory; @@ -93,10 +102,11 @@ public UnboundedSolaceSource( this.timestampFn = timestampFn; this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold; this.parseFn = parseFn; + this.ackDeadlineSeconds = ackDeadlineSeconds; } @Override - public UnboundedReader createReader( + public UnboundedReader> createReader( PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) { // it makes no sense to resume a Solace Session with the previous checkpoint // so don't need the pass a checkpoint to new a Solace Reader @@ -134,7 +144,8 @@ private List> getSolaceSources( coder, timestampFn, watermarkIdleDurationThreshold, - parseFn); + parseFn, + ackDeadlineSeconds); sourceList.add(source); } return sourceList; @@ -146,8 +157,8 @@ public Coder getCheckpointMarkCoder() { } @Override - public Coder getOutputCoder() { - return coder; + public Coder> getOutputCoder() { + return KvCoder.of(VarLongCoder.of(), coder); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java index d4703237371a..181cb3e38e3a 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java @@ -84,4 +84,7 @@ public Queue createQueueForTopic(String queueName, String topicName) throws IOEx public long getBacklogBytes(String queueName) throws IOException { return getBacklogBytesFn.apply(queueName); } + + @Override + public void ack(String queueName, Long msgId) throws IOException {} } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index c17ec3e128d2..a6c2e31f7048 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -72,14 +73,16 @@ private Read getDefaultRead() { return SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withMaxNumConnections(1); + .withMaxNumConnections(1) + .withAckDeadlineSeconds(1); } private Read getDefaultReadForTopic() { return SolaceIO.read() .from(Solace.Topic.fromName("topic")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withMaxNumConnections(1); + .withMaxNumConnections(1) + .withAckDeadlineSeconds(1); } private static BytesXMLMessage getOrNull(Integer index, List messages) { @@ -97,7 +100,8 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi spec.inferCoder(pipeline, configuration.getTypeDescriptor()), configuration.getTimestampFn(), configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()); + configuration.getParseFn(), + configuration.getAckDeadlineSeconds()); } @Test @@ -389,7 +393,7 @@ public void testCheckpointMark() throws Exception { Read spec = getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory); UnboundedSolaceSource initialSource = getSource(spec, pipeline); - UnboundedReader reader = + UnboundedReader> reader = initialSource.createReader(PipelineOptionsFactory.create(), null); // start the reader and move to the first record @@ -437,11 +441,12 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { Read spec = getDefaultRead() .withSessionServiceFactory(fakeSessionServiceFactory) - .withMaxNumConnections(4); + .withMaxNumConnections(4) + .withAckDeadlineSeconds(1); UnboundedSolaceSource initialSource = getSource(spec, pipeline); - UnboundedReader reader = + UnboundedReader> reader = initialSource.createReader(PipelineOptionsFactory.create(), null); // start the reader and move to the first record @@ -458,7 +463,7 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. + // consume 1 more message. This will still not call ack. reader.advance(); assertEquals(0, countAckMessages.get()); @@ -471,73 +476,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // No change in the acknowledged messages, because they were acknowledged in the #advance() // method. assertEquals(4, countAckMessages.get()); - } - - @Test - public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception { - AtomicInteger countConsumedMessages = new AtomicInteger(0); - AtomicInteger countAckMessages = new AtomicInteger(0); - - // Broker that creates input data - SerializableFunction recordFn = - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }; - - SessionServiceFactory fakeSessionServiceFactory = - MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); - Read spec = - getDefaultRead() - .withSessionServiceFactory(fakeSessionServiceFactory) - .withMaxNumConnections(4); - - UnboundedSolaceSource initialSource = getSource(spec, pipeline); + checkpointMark = reader.getCheckpointMark(); - UnboundedReader reader = - initialSource.createReader(PipelineOptionsFactory.create(), null); - - // start the reader and move to the first record - assertTrue(reader.start()); - - // consume 3 messages (NB: #start() already consumed the first message) - for (int i = 0; i < 3; i++) { - assertTrue(String.format("Failed at %d-th message", i), reader.advance()); - } - - // #advance() was called, but the messages were not ready to be acknowledged. - assertEquals(0, countAckMessages.get()); - - // mark all consumed messages as ready to be acknowledged - CheckpointMark checkpointMark = reader.getCheckpointMark(); - - // data is flushed - - // consume 1 more message. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - // consume 1 more message. No change in the acknowledged messages. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - CheckpointMark checkpointMark2 = reader.getCheckpointMark(); - // data is prepared for flushing that will be rejected - - // acknowledge from the first checkpoint may arrive late + Thread.sleep(2000); checkpointMark.finalizeCheckpoint(); - + // messages were nacked, no change in expected values assertEquals(4, countAckMessages.get()); - - checkpointMark2.finalizeCheckpoint(); - assertEquals(6, countAckMessages.get()); } @Test @@ -571,7 +516,7 @@ public void testCheckpointMarkSafety() throws Exception { UnboundedSolaceSource initialSource = getSource(spec, pipeline); - UnboundedReader reader = + UnboundedReader> reader = initialSource.createReader(PipelineOptionsFactory.create(), null); // start the reader and move to the first record @@ -609,7 +554,7 @@ public void testCheckpointMarkSafety() throws Exception { @Test public void testDefaultCoder() { Coder coder = - new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null) + new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null, 1) .getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java index 6d2b3a27ffd0..27c520f99145 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.solace.it; +import com.github.dockerjava.api.model.Ulimit; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.UrlEscapers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container.ExecResult; @@ -43,10 +45,16 @@ public class SolaceContainerManager { public SolaceContainerManager() throws IOException { this.container = - new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:10.7")) { + new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:latest")) { { addFixedExposedPort(jcsmpPortMapped, 55555); addFixedExposedPort(sempPortMapped, 8080); + withCreateContainerCmdModifier( + cmd -> { + cmd.getHostConfig() + .withShmSize((long) Math.pow(1024, 3)) + .withUlimits(new Ulimit[] {new Ulimit("nofile", 2448L, 1048576L)}); + }); } }.withVpn(VPN_NAME) .withCredentials(USERNAME, PASSWORD) @@ -97,7 +105,7 @@ void createQueueWithSubscriptionTopic(String queueName) { "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/queues/" - + queueName + + UrlEscapers.urlPathSegmentEscaper().escape(queueName) + "/subscriptions", "-X", "POST", @@ -138,7 +146,7 @@ public void getQueueDetails(String queueName) { "http://localhost:8080/SEMP/v2/monitor/msgVpns/" + VPN_NAME + "/queues/" - + queueName + + UrlEscapers.urlPathSegmentEscaper().escape(queueName) + "/msgs", "-X", "GET", diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java index b047026a2bbd..b6028a4beaed 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java @@ -102,6 +102,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() { SolaceIO.read() .from(Queue.fromName(QUEUE_NAME)) .withMaxNumConnections(1) + .withAckDeadlineSeconds(10) .withDeduplicateRecords(true) .withSempClientFactory( BasicAuthSempClientFactory.builder() @@ -118,7 +119,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() { PipelineResult pipelineResult = writerPipeline.run(); // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, // as the Read connector will keep attempting to read forever. - pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + pipelineResult.waitUntilFinish(Duration.standardMinutes(1)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java index 8311a67bf6c1..ff5f9d9d4a7c 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -63,7 +63,7 @@ public class SolaceIOIT { private static final String READ_COUNT = "read_count"; private static final String WRITE_COUNT = "write_count"; private static SolaceContainerManager solaceContainerManager; - private static final String queueName = "test_queue"; + private static final String queueName = "test_queue/with/slash"; private static final TestPipelineOptions pipelineOptions; private static final long PUBLISH_MESSAGE_COUNT = 20; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java index 77d00b4e41ec..a11b20d79817 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java @@ -107,6 +107,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() { SolaceIO.read() .from(Queue.fromName(QUEUE_NAME)) .withMaxNumConnections(1) + .withAckDeadlineSeconds(10) .withDeduplicateRecords(true) .withSempClientFactory( BasicAuthMultipleSempClientFactory.builder() @@ -131,7 +132,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() { PipelineResult pipelineResult = writerPipeline.run(); // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, // as the Read connector will keep attempting to read forever. - pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + pipelineResult.waitUntilFinish(Duration.standardMinutes(2)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);