Skip to content

Commit 171212f

Browse files
authored
MODDICORE-484. Update KafkaEventPublisher to send x-okapi-request header (#419)
* MODDICORE-484. Update KafkaEventPublisher to send x-okapi-request header * MODDICORE-421. Include Job Execution Identifiers In Logs * MODDICORE-421. Increase unit tests coverage
1 parent e7104be commit 171212f

File tree

12 files changed

+131
-17
lines changed

12 files changed

+131
-17
lines changed

src/main/java/org/folio/processing/events/EventManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.apache.commons.lang3.StringUtils.isNotBlank;
3232
import static org.folio.DataImportEventTypes.DI_COMPLETED;
3333
import static org.folio.DataImportEventTypes.DI_ERROR;
34+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
3435
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
3536
import static org.folio.rest.jaxrs.model.ProfileType.JOB_PROFILE;
3637
import static org.folio.rest.jaxrs.model.ProfileType.MAPPING_PROFILE;
@@ -67,7 +68,9 @@ static List<EventPublisher> getEventPublishers() {
6768
* @return future with event payload after handling
6869
*/
6970
public static CompletableFuture<DataImportEventPayload> handleEvent(DataImportEventPayload eventPayload, ProfileSnapshotWrapper jobProfileSnapshot) {
70-
LOGGER.trace("handleEvent:: Event type: {}, event payload: {}", eventPayload.getEventType(), eventPayload);
71+
LOGGER.trace("handleEvent:: Event type: {} jobExecutionId: {} recordId: {}, event payload: {}",
72+
eventPayload.getEventType(), eventPayload.getJobExecutionId(),
73+
extractRecordId(eventPayload), eventPayload);
7174
CompletableFuture<DataImportEventPayload> future = new CompletableFuture<>();
7275
try {
7376
setCurrentNodeIfRoot(eventPayload, jobProfileSnapshot);
@@ -78,12 +81,14 @@ public static CompletableFuture<DataImportEventPayload> handleEvent(DataImportEv
7881
if (publishThrowable == null) {
7982
future.complete(eventPayload);
8083
} else {
81-
LOGGER.warn("handleEvent:: Can`t publish event", publishThrowable);
84+
LOGGER.warn("handleEvent:: Can`t publish event jobExecutionId: {} recordId: {}",
85+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), publishThrowable);
8286
future.completeExceptionally(publishThrowable);
8387
}
8488
}));
8589
} catch (Exception e) {
86-
LOGGER.warn("handleEvent:: Can`t handle event", e);
90+
LOGGER.warn("handleEvent:: Can`t handle event jobExecutionId: {} recordId: {}",
91+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
8792
future.completeExceptionally(e);
8893
}
8994
return future;

src/main/java/org/folio/processing/events/services/processor/EventProcessorImpl.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.folio.processing.events.EventManager.OL_ACCUMULATIVE_RESULTS;
1717
import static org.folio.processing.events.EventManager.POST_PROCESSING_INDICATOR;
1818
import static org.folio.processing.events.EventManager.POST_PROCESSING_RESULT_EVENT_KEY;
19+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
1920

2021
public class EventProcessorImpl implements EventProcessor {
2122

@@ -25,7 +26,8 @@ public class EventProcessorImpl implements EventProcessor {
2526

2627
@Override
2728
public CompletableFuture<DataImportEventPayload> process(DataImportEventPayload eventPayload) {
28-
LOG.debug("process:: Processing event payload {}", eventPayload);
29+
LOG.debug("process:: Processing event payload jobExecutionId: {} recordId: {}",
30+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
2931
CompletableFuture<DataImportEventPayload> future = new CompletableFuture<>();
3032
try {
3133
Optional<EventHandler> optionalEventHandler = eventHandlers.stream()
@@ -41,18 +43,22 @@ public CompletableFuture<DataImportEventPayload> process(DataImportEventPayload
4143
.whenComplete((payload, throwable) -> {
4244
logEventProcessingTime(eventType, startTime, eventPayload);
4345
if (throwable != null) {
44-
LOG.warn("process:: Failed to process event payload", throwable);
46+
LOG.warn("process:: Failed to process event payload jobExecutionId: {} recordId: {}",
47+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), throwable);
4548
future.completeExceptionally(throwable);
4649
} else {
4750
future.complete(payload);
4851
}
4952
});
5053
} else {
51-
LOG.info("process:: No suitable handler found for {} event type and current profile {}", eventPayload.getEventType(), eventPayload.getCurrentNode().getContentType());
54+
LOG.info("process:: No suitable handler found for {} event type and current profile {} jobExecutionId: {} recordId: {}",
55+
eventPayload.getEventType(), eventPayload.getCurrentNode().getContentType(),
56+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
5257
future.completeExceptionally(new EventHandlerNotFoundException(format("No suitable handler found for %s event type", eventPayload.getEventType())));
5358
}
5459
} catch (Exception e) {
55-
LOG.warn("process:: Failed to process event payload", e);
60+
LOG.warn("process:: Failed to process event payload jobExecutionId: {} recordId: {}",
61+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
5662
future.completeExceptionally(e);
5763
}
5864
return future;
@@ -75,14 +81,17 @@ private void logEventProcessingTime(String eventType, long startTime, DataImport
7581
var endTime = System.nanoTime();
7682
final String lastEvent = getLastEvent(eventPayload);
7783
if (DI_SRS_MARC_AUTHORITY_RECORD_CREATED.value().equals(lastEvent)) {
78-
LOG.debug("logEventProcessingTime:: Event '{}' has been processed for {} ms", lastEvent, (endTime - startTime) / 1000000L);
84+
LOG.debug("logEventProcessingTime:: Event '{}' has been processed for {} ms jobExecutionId: {} recordId: {}",
85+
lastEvent, (endTime - startTime) / 1000000L, eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
7986
} else {
8087
String profileType = eventPayload.getCurrentNode().getContentType().toString();
8188
String profileId = eventPayload.getCurrentNode().getProfileId();
82-
LOG.debug("logEventProcessingTime:: Event '{}' has been processed using {} with id '{}' for {} ms", eventType, profileType, profileId, (endTime - startTime) / 1000000L);
89+
LOG.debug("logEventProcessingTime:: Event '{}' has been processed using {} with id '{}' for {} ms jobExecutionId: {} recordId: {}",
90+
eventType, profileType, profileId, (endTime - startTime) / 1000000L, eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
8391
}
8492
} catch (Exception e) {
85-
LOG.warn("logEventProcessingTime:: An Exception occurred {}", e.getMessage());
93+
LOG.warn("logEventProcessingTime:: An Exception occurred {} jobExecutionId: {} recordId: {}",
94+
e.getMessage(), eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
8695
}
8796
}
8897

src/main/java/org/folio/processing/events/services/publisher/KafkaEventPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.atomic.AtomicLong;
2626

27+
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_REQUEST_ID_HEADER;
2728
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
2829
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
2930
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
@@ -115,6 +116,9 @@ private List<KafkaHeader> getHeaders(DataImportEventPayload eventPayload, String
115116
Optional.ofNullable(eventPayload.getContext())
116117
.map(it -> it.get(USER_ID_HEADER))
117118
.ifPresent(userId -> headers.add(KafkaHeader.header(USER_ID_HEADER, userId)));
119+
Optional.ofNullable(eventPayload.getContext())
120+
.map(it -> it.get(OKAPI_REQUEST_ID_HEADER))
121+
.ifPresent(requestId -> headers.add(KafkaHeader.header(OKAPI_REQUEST_ID_HEADER, requestId)));
118122

119123
headers.add(KafkaHeader.header(OKAPI_URL_HEADER, eventPayload.getOkapiUrl()));
120124
headers.add(KafkaHeader.header(OKAPI_TENANT_HEADER, eventPayload.getTenant()));
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.folio.processing.events.utils;
2+
3+
import org.folio.DataImportEventPayload;
4+
import org.folio.processing.events.services.publisher.KafkaEventPublisher;
5+
6+
/**
7+
* Utility class for event-related operations
8+
*/
9+
public final class EventUtils {
10+
11+
private EventUtils() {
12+
}
13+
14+
/**
15+
* Safely extracts recordId from event payload context.
16+
* Returns empty string if recordId is null or context is null.
17+
*
18+
* @param eventPayload the event payload
19+
* @return recordId
20+
*/
21+
public static String extractRecordId(DataImportEventPayload eventPayload) {
22+
if (eventPayload == null || eventPayload.getContext() == null) {
23+
return "";
24+
}
25+
return eventPayload.getContext().get(KafkaEventPublisher.RECORD_ID_HEADER);
26+
}
27+
}

src/main/java/org/folio/processing/mapping/MappingManager.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020

21+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
2122
import static org.folio.rest.jaxrs.model.ProfileType.MAPPING_PROFILE;
2223

2324
/**
@@ -47,7 +48,8 @@ private MappingManager() {
4748
public static DataImportEventPayload map(DataImportEventPayload eventPayload, MappingContext mappingContext) {
4849
try {
4950
if (eventPayload.getCurrentNode().getContentType() != MAPPING_PROFILE) {
50-
LOGGER.info("map:: Current node is not of {} content type", MAPPING_PROFILE);
51+
LOGGER.info("map:: Current node is not of {} content type jobExecutionId: {} recordId: {}",
52+
MAPPING_PROFILE, eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
5153
return eventPayload;
5254
}
5355
ProfileSnapshotWrapper mappingProfileWrapper = eventPayload.getCurrentNode();
@@ -65,7 +67,8 @@ public static DataImportEventPayload map(DataImportEventPayload eventPayload, Ma
6567
Mapper mapper = FACTORY_REGISTRY.createMapper(eventPayload, reader, writer);
6668
return mapper.map(mappingProfile, eventPayload, mappingContext);
6769
} catch (Exception e) {
68-
LOGGER.warn("map:: Failed to perform mapping", e);
70+
LOGGER.warn("map:: Failed to perform mapping jobExecutionId: {} recordId: {}",
71+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
6972
throw new MappingException(e);
7073
}
7174
}

src/main/java/org/folio/processing/mapping/mapper/mappers/AbstractMapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.io.IOException;
1616
import java.util.List;
1717

18+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
19+
1820
public class AbstractMapper implements Mapper {
1921
private static final Logger LOGGER = LogManager.getLogger(AbstractMapper.class);
2022

@@ -42,7 +44,8 @@ public DataImportEventPayload map(MappingProfile profile, DataImportEventPayload
4244
}
4345
return writer.getResult(eventPayload);
4446
} catch (IOException e) {
45-
LOGGER.warn("map:: Failed to perform Abstract mapping", e);
47+
LOGGER.warn("map:: Failed to perform Abstract mapping jobExecutionId: {} recordId: {}",
48+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
4649
throw new MappingException(e);
4750
}
4851
}

src/main/java/org/folio/processing/mapping/mapper/mappers/HoldingsMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Objects;
2323
import java.util.Optional;
2424

25+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
2526
import static org.folio.processing.mapping.mapper.reader.record.marc.MarcRecordReader.EXPRESSIONS_DIVIDER;
2627
import static org.folio.processing.mapping.mapper.reader.record.marc.MarcRecordReader.MARC_PATTERN;
2728

@@ -48,7 +49,8 @@ public DataImportEventPayload map(MappingProfile profile, DataImportEventPayload
4849
}
4950
return executeMultipleHoldingsLogic(eventPayload, profile, mappingContext);
5051
} catch (IOException e) {
51-
LOGGER.warn("map:: Failed to perform Holdings mapping", e);
52+
LOGGER.warn("map:: Failed to perform Holdings mapping jobExecutionId: {} recordId: {}",
53+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
5254
throw new MappingException(e);
5355
}
5456
}

src/main/java/org/folio/processing/mapping/mapper/mappers/ItemMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.HashMap;
1919
import java.util.List;
2020

21+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
2122
import static org.folio.processing.mapping.mapper.mappers.HoldingsMapper.MULTIPLE_HOLDINGS_FIELD;
2223
import static org.folio.rest.jaxrs.model.EntityType.ITEM;
2324

@@ -40,7 +41,8 @@ public DataImportEventPayload map(MappingProfile profile, DataImportEventPayload
4041
}
4142
return executeMultipleItemsLogic(eventPayload, profile, mappingContext);
4243
} catch (IOException e) {
43-
LOGGER.warn("map:: Failed to perform Items mapping", e);
44+
LOGGER.warn("map:: Failed to perform Items mapping jobExecutionId: {} recordId: {}",
45+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
4446
throw new MappingException(e);
4547
}
4648
}

src/main/java/org/folio/processing/matching/MatchingManager.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.folio.processing.matching;
22

3+
import static org.folio.processing.events.utils.EventUtils.extractRecordId;
34
import static org.folio.rest.jaxrs.model.ProfileType.MATCH_PROFILE;
45

56
import java.util.Map;
@@ -36,7 +37,8 @@ public static CompletableFuture<Boolean> match(DataImportEventPayload eventPaylo
3637
CompletableFuture<Boolean> future = new CompletableFuture<>();
3738
try {
3839
if (eventPayload.getCurrentNode().getContentType() != MATCH_PROFILE) {
39-
LOGGER.info("match:: Current node is not of {} content type", MATCH_PROFILE);
40+
LOGGER.info("match:: Current node is not of {} content type jobExecutionId: {} recordId: {}",
41+
MATCH_PROFILE, eventPayload.getJobExecutionId(), extractRecordId(eventPayload));
4042
future.complete(false);
4143
return future;
4244
}
@@ -53,7 +55,8 @@ public static CompletableFuture<Boolean> match(DataImportEventPayload eventPaylo
5355

5456
return matcher.match(eventPayload);
5557
} catch (Exception e) {
56-
LOGGER.warn("match:: Failed to perform matching", e);
58+
LOGGER.warn("match:: Failed to perform matching jobExecutionId: {} recordId: {}",
59+
eventPayload.getJobExecutionId(), extractRecordId(eventPayload), e);
5760
future.completeExceptionally(new MatchingException(e));
5861
}
5962
return future;

src/main/java/org/folio/rest/util/OkapiConnectionParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public final class OkapiConnectionParams {
99
public static final String OKAPI_TENANT_HEADER = "x-okapi-tenant";
1010
public static final String OKAPI_TOKEN_HEADER = "x-okapi-token";
1111
public static final String USER_ID_HEADER = "userId";
12+
public static final String OKAPI_REQUEST_ID_HEADER = "x-okapi-request-id";
1213
private String okapiUrl;
1314
private String tenantId;
1415
private String token;

0 commit comments

Comments
 (0)