Database Concurrency
Multiple users updating the same aggregate is a classic concurrency problem in multi-user applications, often referred to as the “lost update” problem. In an Event Sourcing system, preventing this overwrite is crucial because the sequence of events defines the state.
The standard and most effective way to prevent concurrent updates from overwriting each other in an Event Sourcing system is through Optimistic Concurrency Control (OCC), specifically using version numbers (or sequence numbers) at the aggregate level.
How Optimistic Concurrency Control (OCC) Works in Event Sourcing
-
Version Tracking (Sequence Number):
- Every Aggregate (e.g., a
Customer, anOrder, aProduct) has a version, which is typically its current sequence number in the event stream. This sequence number represents the number of events that have been applied to build its current state. - Your
events_store_ttable already hassequence_numberfor this purpose:
TheCREATE TABLE events_store_t ( id UUID PRIMARY KEY, aggregate_id VARCHAR(255) NOT NULL, -- ... other fields ... sequence_number BIGINT NOT NULL, -- This is the key! UNIQUE (aggregate_id, sequence_number) -- CRITICAL constraint! );UNIQUE (aggregate_id, sequence_number)constraint is the fundamental database-level guarantee against concurrent writes for the same aggregate at the same version.
- Every Aggregate (e.g., a
-
Load the Aggregate’s Current Version:
- When your application service wants to modify an aggregate, it first loads the aggregate’s current state by replaying all events for that
aggregate_idfrom theevents_store_t. - During this replay, it tracks the
currentSequenceNumber(the sequence number of the last event applied).
- When your application service wants to modify an aggregate, it first loads the aggregate’s current state by replaying all events for that
-
Pass Expected Version with Command:
- The user interface (UI) or the client application that initiated the change should also hold the
currentSequenceNumberit observed when it last fetched the aggregate’s state. - This
expectedVersion(orexpectedSequenceNumber) is then sent along with the command (e.g.,UpdateCustomerProfileCommand(customerId, newName, newAddress, expectedSequenceNumber)).
- The user interface (UI) or the client application that initiated the change should also hold the
-
Conditional Event Appending:
- When your
CustomerApplicationServicereceives the command:- It loads the
Customeraggregate from theevents_store_t, determining its actualcurrentSequenceNumber. - It compares the
command.expectedSequenceNumberwith thecustomer.actualCurrentSequenceNumber(derived from the Event Store). - If
command.expectedSequenceNumberdoes NOT matchcustomer.actualCurrentSequenceNumber: This means another concurrent transaction has already written new events for this aggregate since the client loaded its state. AConcurrencyException(or similar domain-specific exception) is thrown. - If they DO match: The aggregate’s business logic is applied, generating new events. These new events will have
customer.actualCurrentSequenceNumber + 1,customer.actualCurrentSequenceNumber + 2, etc.
- It loads the
- When your
-
Atomic Persistence (The DB Constraint):
- The new events are then attempted to be saved to
events_store_t(andoutbox_messages) within a single database transaction. - If a concurrency conflict was not detected at step 4 (meaning two commands arrived almost simultaneously and passed the initial check), the
UNIQUE (aggregate_id, sequence_number)constraint in theevents_store_ttable will prevent the “lost update.” Only the first transaction to successfully insert events with the “next” sequence numbers will succeed. The second will fail with aDataIntegrityViolationException(or similar).
- The new events are then attempted to be saved to
Example Flow:
- User A fetches
Customer-123. The current state (replayed fromevents_store_t) showssequenceNumber = 5. - User B also fetches
Customer-123. It also seessequenceNumber = 5. - User A sends
UpdateCustomerProfileCommand(customerId="123", newName="Alice", expectedSequenceNumber=5).- App Service loads
Customer-123, actualsequenceNumber = 5. MatchesexpectedSequenceNumber. - Generates
CustomerNameChangedevent withsequenceNumber = 6. - Attempts to save event(s) to
events_store_t(andoutbox_messages). Succeeds.
- App Service loads
- User B sends
UpdateCustomerProfileCommand(customerId="123", newAddress="456 Oak", expectedSequenceNumber=5).- App Service loads
Customer-123. It now replays events up tosequenceNumber = 6. So,actualSequenceNumber = 6. - It compares
command.expectedSequenceNumber=5withcustomer.actualSequenceNumber=6. They do NOT match! - The
CustomerApplicationServicethrows aConcurrencyException. - The transaction is rolled back, and no events are written from User B’s command.
- App Service loads
Java Implementation Changes
Let’s modify the previous CustomerApplicationService and add a way to load the aggregate from events.
1. Customer Aggregate (Revised)
// domain/Customer.java (Revised)
package com.example.eventoutbox.domain;
import com.example.eventoutbox.domain.events.CustomerAddressChanged;
import com.example.eventoutbox.domain.events.CustomerNameChanged;
import com.example.eventoutbox.domain.events.DomainEvent;
import lombok.Getter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Getter
public class Customer {
private final String customerId;
private String name;
private String address;
private long version; // This is the 'sequenceNumber' of the LAST applied event
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Constructor for creating a new aggregate
public Customer(String customerId) {
this.customerId = customerId;
this.version = 0; // New aggregates start at version 0
}
// Static factory method to load an aggregate from its events
public static Customer loadFromEvents(String customerId, List<DomainEvent> history) {
Customer customer = new Customer(customerId);
history.forEach(customer::applyEvent); // Apply each historical event
return customer;
}
// Method to apply an event to the aggregate's state
private void applyEvent(DomainEvent event) {
// This is where you would update the aggregate's internal state
// based on the specific event type.
if (event instanceof CustomerNameChanged nameChanged) {
this.name = nameChanged.getNewName();
} else if (event instanceof CustomerAddressChanged addressChanged) {
this.address = addressChanged.getNewAddress();
}
this.version = event.getSequenceNumber(); // Update version to the sequence number of the applied event
}
// Domain behavior methods that generate new events
public void changeName(String newName) {
if (!newName.equals(this.name)) {
// New events get the *next* sequence number
long nextSequence = this.version + 1;
CustomerNameChanged event = new CustomerNameChanged(UUID.randomUUID(), Instant.now(), customerId, nextSequence, newName);
uncommittedEvents.add(event);
applyEvent(event); // Apply immediately to current state for consistency
}
}
public void changeAddress(String newAddress) {
if (!newAddress.equals(this.address)) {
long nextSequence = this.version + 1;
CustomerAddressChanged event = new CustomerAddressChanged(UUID.randomUUID(), Instant.now(), customerId, nextSequence, newAddress);
uncommittedEvents.add(event);
applyEvent(event);
}
}
public void markEventsCommitted() {
this.uncommittedEvents.clear();
}
}
2. ConcurrencyException
// domain/ConcurrencyException.java
package com.example.eventoutbox.domain;
public class ConcurrencyException extends RuntimeException {
public ConcurrencyException(String message) {
super(message);
}
}
3. CustomerApplicationService (Revised)
// application/CustomerApplicationService.java (Revised)
package com.example.eventoutbox.application;
import com.example.eventoutbox.domain.ConcurrencyException;
import com.example.eventoutbox.domain.Customer;
import com.example.eventoutbox.domain.events.DomainEvent;
import com.example.eventoutbox.infrastructure.persistence.eventstore.EventStoreEvent;
import com.example.eventoutbox.infrastructure.persistence.eventstore.EventStoreEventRepository;
import com.example.eventoutbox.infrastructure.persistence.outbox.OutboxMessage;
import com.example.eventoutbox.infrastructure.persistence.outbox.OutboxMessageRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class CustomerApplicationService {
private final OutboxMessageRepository outboxMessageRepository;
private final EventStoreEventRepository eventStoreEventRepository;
private final ObjectMapper objectMapper;
// Command now includes expectedVersion
public record UpdateCustomerProfileCommand(String customerId, String newName, String newAddress, long expectedVersion) {}
@Transactional
public void updateCustomerProfile(UpdateCustomerProfileCommand command) {
// --- 1. Load Aggregate State ---
List<EventStoreEvent> historicalEvents = eventStoreEventRepository.findByAggregateIdOrderBySequenceNumberAsc(command.customerId());
Customer customer;
if (historicalEvents.isEmpty()) {
customer = new Customer(command.customerId());
// If it's a new aggregate, expectedVersion must be 0
if (command.expectedVersion() != 0) {
throw new ConcurrencyException("Customer with ID " + command.customerId() + " does not exist or expected version is incorrect.");
}
} else {
// Deserialize historical events to DomainEvent objects
List<DomainEvent> domainEventsHistory = historicalEvents.stream()
.map(this::deserializeEventStoreEvent)
.collect(Collectors.toList());
customer = Customer.loadFromEvents(command.customerId(), domainEventsHistory);
// --- 2. OPTIMISTIC CONCURRENCY CHECK ---
if (customer.getVersion() != command.expectedVersion()) {
throw new ConcurrencyException(
"Customer with ID " + command.customerId() + " has been updated by another user. " +
"Expected version " + command.expectedVersion() + " but found " + customer.getVersion() + "."
);
}
}
// --- 3. Apply Business Logic & Generate Events ---
if (command.newName() != null) {
customer.changeName(command.newName());
}
if (command.newAddress() != null) {
customer.changeAddress(command.newAddress());
}
// --- 4. Persist Events to Event Store & Outbox (Atomically) ---
List<DomainEvent> eventsToStore = customer.getUncommittedEvents();
if (eventsToStore.isEmpty()) {
return; // No changes, no events to publish
}
try {
List<EventStoreEvent> eventStoreEntities = eventsToStore.stream()
.map(this::mapToEventStoreEvent)
.collect(Collectors.toList());
eventStoreEventRepository.saveAll(eventStoreEntities);
List<OutboxMessage> outboxMessages = eventsToStore.stream()
.map(this::mapToOutboxMessage)
.collect(Collectors.toList());
outboxMessageRepository.saveAll(outboxMessages);
customer.markEventsCommitted();
} catch (DataIntegrityViolationException e) {
// This catches the UNIQUE constraint violation on (aggregate_id, sequence_number)
// This means another transaction has just written to this aggregate
throw new ConcurrencyException(
"Another concurrent update detected for customer " + command.customerId() + ". " +
"Please refresh and try again.", e
);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize event to JSON", e);
}
}
// Helper methods for mapping/deserializing (similar to before)
private OutboxMessage mapToOutboxMessage(DomainEvent event) {
try {
return OutboxMessage.builder()
.id(event.getEventId())
.aggregateId(event.getAggregateId())
.aggregateType(event.getAggregateType())
.eventType(event.getEventType())
.timestamp(event.getTimestamp())
.payload(objectMapper.writeValueAsString(event))
.metadata(null)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event to JSON: " + event.getEventId(), e);
}
}
private EventStoreEvent mapToEventStoreEvent(DomainEvent event) {
try {
return EventStoreEvent.builder()
.id(event.getEventId())
.aggregateId(event.getAggregateId())
.aggregateType(event.getAggregateType())
.eventType(event.getEventType())
.timestamp(event.getTimestamp())
.sequenceNumber(event.getSequenceNumber())
.payload(objectMapper.writeValueAsString(event))
.metadata(null)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event to JSON: " + event.getEventId(), e);
}
}
private DomainEvent deserializeEventStoreEvent(EventStoreEvent eventStoreEvent) {
try {
// Assuming your event JSON includes the 'eventType' field for polymorphic deserialization
return objectMapper.readValue(eventStoreEvent.getPayload(), DomainEvent.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to deserialize event: " + eventStoreEvent.getId(), e);
}
}
}
4. EventStoreEventRepository (Add find method)
// infrastructure/persistence/eventstore/EventStoreEventRepository.java (Revised)
package com.example.eventoutbox.infrastructure.persistence.eventstore;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
import java.util.UUID;
public interface EventStoreEventRepository extends JpaRepository<EventStoreEvent, UUID> {
List<EventStoreEvent> findByAggregateIdOrderBySequenceNumberAsc(String aggregateId);
}
5. CustomerController (Handle Exception)
// application/CustomerController.java (Revised)
package com.example.eventoutbox.application;
import com.example.eventoutbox.domain.ConcurrencyException;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/customers")
@RequiredArgsConstructor
public class CustomerController {
private final CustomerApplicationService customerApplicationService;
public record UpdateCustomerProfileRequest(String customerId, String newName, String newAddress, long expectedVersion) {}
@PostMapping("/profile")
public ResponseEntity<String> updateCustomerProfile(@RequestBody UpdateCustomerProfileRequest request) {
CustomerApplicationService.UpdateCustomerProfileCommand command =
new CustomerApplicationService.UpdateCustomerProfileCommand(
request.customerId(), request.newName(), request.newAddress(), request.expectedVersion()
);
customerApplicationService.updateCustomerProfile(command);
return ResponseEntity.ok("Customer profile update command received and processed.");
}
@ExceptionHandler(ConcurrencyException.class)
public ResponseEntity<String> handleConcurrencyException(ConcurrencyException ex) {
return ResponseEntity.status(HttpStatus.CONFLICT).body(ex.getMessage());
}
}
How to Handle Concurrency Conflicts on the Client/UI Side:
When ConcurrencyException is thrown:
- Inform the User: Display a message like “This item has been updated by another user. Please refresh the page to see the latest changes and try your update again.”
- Retry (less common for user-facing, but possible for background jobs): For non-interactive or automated processes, you might implement a retry mechanism. This retry would need to:
- Fetch the latest state of the aggregate from a read model.
- Re-create the command based on the original intent and the newly fetched expected version.
- Re-send the command.
- This is typically only done if the change is “safe” to re-apply (e.g., adding an item, not changing a specific value).
By combining the version check in your application service with the UNIQUE constraint in your database, you create a robust optimistic concurrency control mechanism that prevents lost updates effectively.
What if event consumer fails to apply an event to its read model
In this case, the read model becomes stale, and subsequent attempts to update based on that stale data will lead to conflicts.
Let’s break down the scenario and the robust solution.
The Problem Scenario (as you described)
- UI: Queries
entity_ttable (read model), getsEntity (aggregate_version = 5). - User: Makes changes.
- UI: Sends
UpdateCommand (..., expectedVersion = 5)to the write model. - Write Model (Command Handler):
- Loads aggregate from
event_store_t. Let’s say itsactualVersionis5. - OCC Check:
actualVersion (5) == expectedVersion (5). Success. - Generates
Event (..., sequence_number = 6). - Persists
Event (..., sequence_number = 6)toevent_store_tandoutbox_message_tin an ACID transaction. This commits version 6 to theevent_store_t. - Debezium publishes this event to Kafka.
- Loads aggregate from
- Kafka Consumer (PortalEventConsumer):
- Reads
Event (..., sequence_number = 6, expectedVersion = 5). - Tries to update
entity_t(your read model):UPDATE entity_t SET ..., aggregate_version = 6 WHERE entity_id = ? AND aggregate_version = 5. - FAILURE: An exception occurs in the database update (e.g., a network error, a constraint violation unrelated to
aggregate_version, or the consumer’s JVM crashes). - Result: The
entity_ttable is NOT updated and remains ataggregate_version = 5. Theevent_store_tis ataggregate_version = 6. The read model is now stale.
- Reads
- Next UI interaction:
- UI queries
entity_tagain. It still getsEntity (aggregate_version = 5)because the read model is stale. - UI sends
UpdateCommand (..., expectedVersion = 5).
- UI queries
- Write Model (Command Handler) - Second Attempt:
- Loads aggregate from
event_store_t. ItsactualVersionis6. - OCC Check:
actualVersion (6) != expectedVersion (5). Conflict detected! - Result: The command handler throws a
ConcurrencyException. It does NOT try to insert a new event intoevent_store_twithsequence_number=6(because that would be a duplicate and would indeed fail on the unique constraint). It correctly rejects the command.
- Loads aggregate from
The specific symptom you mentioned (“new event insert into the event_store_t and it will fail because the aggregate version is used before”) should ideally not happen if the write model correctly detects OCC. The ConcurrencyException should prevent the duplicate event generation.
The core problem, then, is stale read models due to consumer processing failures, which then lead to ConcurrencyException at the write model.
The Solution: Robust Kafka Consumer Processing (Retry & DLQ)
The solution lies entirely within your Kafka Consumer’s (PortalEventConsumerStartupHook) error handling strategy.
Your most recent incremental code includes the processSingleEventWithRetries method with retry and DLQ logic. This is precisely the mechanism designed to handle this situation.
Here’s how it’s supposed to work and what you need to ensure is functioning correctly:
-
Idempotency of Read Model Updates:
- All your
dbProvider.createXxx,updateXxx,deleteXxxmethods (e.g.,updateRole,deleteRole,createRole) must be idempotent in their database effects. - For
UPDATEandDELETE,WHERE aggregate_version = expectedVersionmakes them idempotent. If the update was already applied (or a newer version is present),0 rows affectedmeans no harm done (though it might still trigger aConcurrencyExceptionwithin the consumer’sdbProvidermethods if you implement the record-not-found-vs-conflict check). - For
INSERT, useINSERT ... ON CONFLICT (primary_key) DO UPDATE SET aggregate_version = excluded.aggregate_version, ...(UPSERT) if the “create” event might be re-delivered and you expect it to update an existing record (e.g., in a snapshot table). Otherwise, if it’s strictly a “create-only” and a duplicate PK is a bug, theSQLExceptionfor unique constraint violation is correct.
- All your
-
Consumer’s Retry/DLQ Logic (The core fix): The
processSingleEventWithRetriesmethod is crucial.-
Transient Errors:
- If
dbProvider.updateXxx(or any other part ofprocessSingleEventWithRetries) throws a transientSQLException(e.g., connection timeout, deadlock), thecurrentRetryis incremented, andThread.sleepoccurs. - If
maxRetriesis not exhausted,processSingleEventWithRetrieswill returnfalse. - The
onCompletionloop will thenbreak;(meaning it won’tcommitSync()any offsets for this batch). - On the next
readRecordscall, the entire batch (including the transiently failed record) will be re-polled and re-processed. This relies on idempotency.
- If
-
Permanent Errors:
- If
dbProvider.updateXxxthrows aDbProvider.ConcurrencyException(meaning the read model’s version was stale, so theWHERE aggregate_version = expectedVersionupdate in the consumer failed with 0 rows, but the record did exist at a higher version) or anIllegalArgumentException(bad data) or a permanentSQLException(e.g., unique constraint violation on anINSERTwhere it shouldn’t happen, or foreign key constraint violation):processSingleEventWithRetrieswill catch it and callhandlePermanentFailure.handlePermanentFailuresends the original Kafka record to the DLQ.processSingleEventWithRetriesthen returnstrue(because the event has been “handled” by being DLQ’d).onCompletionthen does include this record’s offset inoffsetsToCommitand proceeds tocommitSync()for the batch.- Result: The consumer makes progress past this “poison pill.” The stale event in
entity_tis not updated by this specific event, but the consumer doesn’t get stuck.
- If
-
How to Handle the Stale UI Problem
Once the consumer’s retry/DLQ is robust, the stale UI becomes a UX problem rather than a system consistency problem.
-
Producer’s
ConcurrencyExceptionis Key: When the UI sendsUpdateCommand(..., expectedVersion = 5)and theevent_store_tis already at version 6, the write model will throwConcurrencyException. This is the correct behavior. -
UI Response to
ConcurrencyException:- The UI must catch this
ConcurrencyExceptionfrom the API. - It should display a clear message to the user: “This record has been updated by another user. Please refresh the page to see the latest changes and re-apply your modifications if desired.”
- It should then trigger a refresh of the data from the read model, which will then fetch
Entity (aggregate_version = 6)(assuming the consumer eventually caught up or the record was DLQ’d and a manual fix was applied).
- The UI must catch this
Summary and What to Focus On:
- Ensure
processSingleEventWithRetriesis fully correct and deployed. This is your primary defense against the consumer getting stuck. - Carefully verify the
isTransientSqlErrorlogic. It must accurately identify only recoverable database errors. Misclassifying permanent errors as transient will exhaust retries and then DLQ; misclassifying transient as permanent sends to DLQ too soon. - Confirm
handlePermanentFailurecorrectly sends to DLQ and logs. - Confirm your
dbProvidermethods are truly idempotent for re-delivered messages.UPDATE ... WHERE aggregate_version = expectedVersionis idempotent.INSERTs are trickier. - Educate your UI/API layer: They must handle the
ConcurrencyExceptionfrom the write model by forcing a refresh and user notification.
Your recent changes to PortalEventConsumerStartupHook already implement this retry/DLQ pattern. The next step is to ensure that code is compilable, deployed correctly, and that the dbProvider methods are fully integrated with the aggregate_version for idempotency and OCC checking as discussed in the previous detailed answer.
What if DLQ messages are not handled in a timely fashion
This is a critical operational risk with the Dead Letter Queue (DLQ) pattern: DLQ messages are “handled exceptions,” not “disappearing problems.” If they aren’t processed, they represent real, unapplied business facts that can lead to data inconsistencies and broken business processes over time.
If a message sits in the DLQ for 30 days, your read models will be stale, your UIs will report incorrect data, and downstream systems relying on that information will also be out of sync. This can severely damage data integrity and user trust.
The DLQ is a “Hospital” or “Quarantine Zone,” Not a “Graveyard”
It’s a place for messages that need human intervention or a specific, non-automated re-driving process. It’s not a place for messages to just die.
Strategies to Prevent DLQ Message Stagnation
To ensure DLQ messages are handled in a timely fashion, you need a robust DLQ management strategy that goes beyond just pushing messages to the topic.
1. Robust Monitoring & Alerting (Immediate Action)
- Metric: Count of messages in DLQ topics (
kafka_topic_partition_current_offset,kafka_consumer_group_lag, or custom JMX metrics). - Alerting Thresholds:
- Urgent: Alert immediately (PagerDuty, Slack, SMS) if the number of messages in any DLQ topic goes above 0 or a very small threshold (e.g., 5-10 messages). A DLQ is an exceptional queue.
- Warning: Alert if messages persist for a certain duration (e.g., 1 hour, 4 hours).
- Dashboards: Create a dashboard that prominently displays the number of messages in each DLQ topic and their age.
2. Clear Ownership & Standard Operating Procedures (SOPs)
- Who owns the DLQ? Assign clear responsibility to a specific team (e.g., SRE, Development team for that microservice).
- What’s the process? Define a clear SOP for handling DLQ alerts:
- Acknowledge alert.
- Inspect the DLQ message content (payload, error message, original topic/offset).
- Identify the root cause (code bug, malformed data, transient external system outage, business process error).
- Decide on action:
- Fix Code/Data: If it’s a bug, deploy a fix. If it’s bad data, decide if it needs manual correction in the database or if upstream data entry needs fixing.
- Re-drive: After fixing the root cause, re-drive the message(s) back to the original topic.
- Discard (Rare & Documented): Only if the message is truly unrecoverable garbage or a test message that accidentally ended up there, and its impact is negligible. This decision must be audited and requires strong justification.
3. Automated DLQ Re-driving with Human Trigger (Operational Playbook)
- You’ll need a “re-driver” tool/application.
- Purpose: This tool reads messages from the DLQ, and publishes them back to their original topic for re-processing.
- Features:
- Preview: Show content of DLQ messages before re-driving.
- Selectivity: Allow re-driving specific messages, or ranges of messages.
- Filtering: Filter by error type, timestamp, etc.
- Audit: Log who re-drove what message.
- Integration:
- Could be a simple command-line tool.
- Could be integrated into your internal developer portal or ops dashboard.
- Could be a scheduled job that runs periodically but requires explicit human approval before actually publishing.
4. Automated Retries (Beyond Initial Consumer)
For certain classes of “permanent-but-maybe-not-really” errors (e.g., external API rate limits, very long-running external process), you could have a separate, simpler consumer that specifically subscribes to the DLQ.
- Purpose: This DLQ consumer would only handle a very specific, narrow class of DLQ messages.
- Logic: It would apply its own retry logic (e.g., exponential backoff for a longer period, up to 24 hours).
- Re-DLQ: If this DLQ consumer also fails after its retries, it would push the message back to the same DLQ topic (or a different, truly “unresolvable” DLQ) to re-trigger human intervention.
- Caution: This adds complexity and should only be done for errors you’ve explicitly identified as potentially auto-recoverable over a very long time.
5. Data Reconciliation (Long-Term Strategy)
- For critical business data, even with DLQs, sometimes manual reconciliation or a dedicated reconciliation service might be needed.
- This involves periodically comparing your read model state with the authoritative Event Store (or an authoritative external system) to detect and resolve discrepancies that might have slipped through.
Addressing “Most errors cannot be recovered anyway”
If “most errors cannot be recovered anyway” and always lead to DLQ, it points to a need for:
- Better
isTransientSqlErrorlogic: Are you correctly identifying all transient SQL errors that could be retried by the main consumer? - More robust validation upstream: Can you prevent more malformed/invalid data from even generating events that will fail downstream? (E.g., more validation in the command handler).
- Refined Domain Model/Business Rules: Sometimes, a “permanent” error in the consumer points to a business rule that isn’t fully handled by events, or a valid state transition that wasn’t expected.
DLQs are not a substitute for preventing errors, but they are essential for graceful failure and ensuring eventual consistency. Their effectiveness hinges entirely on the operational discipline and tooling around them.
How to handle the scenario that it takes weeks or even months to fix a bug and get it deployed
This is an issue regarding a major reality of enterprise software: deployment cycles can be slow, and manual intervention for DLQs simply isn’t scalable for a continuous stream of errors. This is precisely why a purely “human-in-the-loop” DLQ management strategy breaks down in a corporate context with persistent bugs.
To handle a bug that takes weeks/months to deploy, you need a strategy that focuses on containment, bypass, and eventually, reconciliation.
Strategy for Long-Deployment-Cycle Bugs (Weeks/Months)
The goal is to stop the errors from piling up indefinitely in the DLQ, and to allow the rest of the system to continue processing, even if some data is temporarily inconsistent.
1. Immediate Containment: Filtering or Skipping “Poison Pill” Messages
-
Implement a “Hot Fix” Filter (Code-based or SMT-based):
- In your Kafka Consumer (
PortalEventConsumerStartupHook): If you identify a bug where a specific type of event (or event with specific data) consistently causes failures:- Add a temporary code filter. For instance, if
ScheduleCreatedEventwithnulluserIdis causingNullPointerException, add:if (eventType.equals(PortalConstants.SCHEDULE_CREATED_EVENT) && eventMap.get("userId") == null) { logger.warn("Skipping known bug event type {} for record {} due to null userId. Not processing.", eventType, record.offset()); handlePermanentFailure(record, "Known bug: null userId for " + eventType, "KnownBugSkip"); return true; // Mark as handled (DLQ'd), commit offset, move on. } - If the bug is in a specific
dbProvidermethod: You can wrap that call in a try-catch forPermanentProcessingExceptionspecifically for that event type, and if it’s the known bug, send it to DLQ and commit.
- Add a temporary code filter. For instance, if
- Using Kafka Connect SMT (if source is Kafka Connect): You could implement a custom
FilterSMT that drops/routes specific problematic messages before they even hit your consumer app. This requires deploying a new SMT, but it can be faster than an app deployment.
- In your Kafka Consumer (
-
Why: This immediately stops the DLQ from growing uncontrollably with known bad messages. It sacrifices processing that specific message but ensures the consumer stays healthy.
2. Automated (Limited) Re-driving for Transient/Known Issues (Or Triage)
- “Error Triage” Consumer: Instead of just sending to a single DLQ, consider a dedicated consumer that subscribes to your main DLQ topic.
- This consumer acts as an automated triage.
- It checks the
errorType(fromhandlePermanentFailure’s metadata). - If
errorTypeis “TransientSqlError” or “RetriesExhausted” (but could eventually succeed): It re-publishes the original message back to theportal-eventtopic with an exponential backoff. It might implement its own max retries (e.g., 50 retries over 24 hours). If it still fails, then it pushes to a “Final DLQ” that truly requires manual intervention. - If
errorTypeis “ConcurrencyConflict”, “DataValidationError”, “UnhandledEventType”, or “KnownBugSkip”: It pushes to a separate “Permanent DLQ” topic. This queue is smaller and truly requires human eyes.
- Why: This handles messages that might eventually self-resolve or that you know can’t be fixed by immediate retries but aren’t necessarily “dead forever.” It reduces the volume of messages requiring immediate human attention.
3. Manual Intervention for “Permanent DLQ” / Complex Bugs (When Devs Get Involved)
- The “Permanent DLQ” is where true bugs/bad data sit.
- The same monitoring and alerting from before applies, but now it’s for a much smaller, higher-priority queue.
- Developers must actively:
- Analyze: What exactly caused this? Why did it bypass automated retries/filters?
- Fix: Develop and deploy the bug fix.
- Reconcile/Re-drive:
- If the bug fix resolves the issue, use a re-driver tool to re-submit messages from the Permanent DLQ to the
portal-eventtopic. - If the bug resulted in data inconsistencies that can’t be fixed by re-driving (e.g., a critical business state was violated), you might need to perform a manual database correction on the affected aggregate(s) (this is the most dangerous and should be avoided if possible).
- If the bug fix resolves the issue, use a re-driver tool to re-submit messages from the Permanent DLQ to the
4. Long-Term Data Reconciliation / Auditing
- Offline Reconciliation: For critical data, implement daily/weekly batch jobs that compare the state of your read model tables with the authoritative Event Store.
- If discrepancies are found, they are reported, and a reconciliation process is triggered (either manual or automated). This ensures that even if events were missed or misapplied, data consistency is eventually achieved.
- Event Replay (When all else fails): If a significant bug causes widespread data corruption or loss of consistency, the ultimate fallback is to:
- Deploy the bug fix.
- Stop the affected read model consumer.
- Clear the affected read model tables.
- Replay all historical events from the
event_store_t(or long-retention Kafka topics) through the fixed consumer logic. This rebuilds the read model from scratch, reflecting the correct business logic. This is why Event Sourcing is so powerful.
Example Workflow with a Long-Deployment-Cycle Bug
- Bug Identified:
ScheduleCreatedEventcreates a schedule, but due to a bug in the consumer’sdbProvider.createSchedulemethod, it tries to insert a duplicate primary key ifscheduleId(aggregate ID) exists, and this causes a permanent error in the consumer. - Immediate Containment (Filter/Bypass):
- A hotfix is applied to the
PortalEventConsumerStartupHook(or a dedicated filter SMT) to recognizeScheduleCreatedEventwherescheduleIdalready exists. - For such events, it
handlePermanentFailure()the message to aportal-event-dlq-permanenttopic (or aKnownBugDLQ). This prevents the main consumer from getting stuck.
- A hotfix is applied to the
- DLQ Accumulation & Monitoring: Messages related to this bug pile up in
portal-event-dlq-permanent. Alerts are firing. - Development Fix: The development team works on a fix for
dbProvider.createSchedule(e.g., changing it to anUPSERTif a “create” event implies “idempotent create/update” or better handling of duplicate primary keys if it truly is an error). This takes weeks. - Deployment: The fix is deployed.
- Re-driving: Operations team uses the re-driver tool to:
- Read messages from
portal-event-dlq-permanent. - Publish them back to the
portal-eventtopic. - The now-fixed consumer processes them correctly.
- Read messages from
This approach balances immediate operational stability with eventual consistency, acknowledging the realities of enterprise deployment cycles.