Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Database Concurrency

Multiple users updating the same aggregate is a classic concurrency problem in multi-user applications, often referred to as the “lost update” problem. In an Event Sourcing system, preventing this overwrite is crucial because the sequence of events defines the state.

The standard and most effective way to prevent concurrent updates from overwriting each other in an Event Sourcing system is through Optimistic Concurrency Control (OCC), specifically using version numbers (or sequence numbers) at the aggregate level.


How Optimistic Concurrency Control (OCC) Works in Event Sourcing

  1. Version Tracking (Sequence Number):

    • Every Aggregate (e.g., a Customer, an Order, a Product) has a version, which is typically its current sequence number in the event stream. This sequence number represents the number of events that have been applied to build its current state.
    • Your events_store_t table already has sequence_number for this purpose:
      CREATE TABLE events_store_t (
          id UUID PRIMARY KEY,
          aggregate_id VARCHAR(255) NOT NULL,
          -- ... other fields ...
          sequence_number BIGINT NOT NULL,       -- This is the key!
          UNIQUE (aggregate_id, sequence_number) -- CRITICAL constraint!
      );
      
      The UNIQUE (aggregate_id, sequence_number) constraint is the fundamental database-level guarantee against concurrent writes for the same aggregate at the same version.
  2. Load the Aggregate’s Current Version:

    • When your application service wants to modify an aggregate, it first loads the aggregate’s current state by replaying all events for that aggregate_id from the events_store_t.
    • During this replay, it tracks the currentSequenceNumber (the sequence number of the last event applied).
  3. Pass Expected Version with Command:

    • The user interface (UI) or the client application that initiated the change should also hold the currentSequenceNumber it observed when it last fetched the aggregate’s state.
    • This expectedVersion (or expectedSequenceNumber) is then sent along with the command (e.g., UpdateCustomerProfileCommand(customerId, newName, newAddress, expectedSequenceNumber)).
  4. Conditional Event Appending:

    • When your CustomerApplicationService receives the command:
      • It loads the Customer aggregate from the events_store_t, determining its actual currentSequenceNumber.
      • It compares the command.expectedSequenceNumber with the customer.actualCurrentSequenceNumber (derived from the Event Store).
      • If command.expectedSequenceNumber does NOT match customer.actualCurrentSequenceNumber: This means another concurrent transaction has already written new events for this aggregate since the client loaded its state. A ConcurrencyException (or similar domain-specific exception) is thrown.
      • If they DO match: The aggregate’s business logic is applied, generating new events. These new events will have customer.actualCurrentSequenceNumber + 1, customer.actualCurrentSequenceNumber + 2, etc.
  5. Atomic Persistence (The DB Constraint):

    • The new events are then attempted to be saved to events_store_t (and outbox_messages) within a single database transaction.
    • If a concurrency conflict was not detected at step 4 (meaning two commands arrived almost simultaneously and passed the initial check), the UNIQUE (aggregate_id, sequence_number) constraint in the events_store_t table will prevent the “lost update.” Only the first transaction to successfully insert events with the “next” sequence numbers will succeed. The second will fail with a DataIntegrityViolationException (or similar).

Example Flow:

  1. User A fetches Customer-123. The current state (replayed from events_store_t) shows sequenceNumber = 5.
  2. User B also fetches Customer-123. It also sees sequenceNumber = 5.
  3. User A sends UpdateCustomerProfileCommand(customerId="123", newName="Alice", expectedSequenceNumber=5).
    • App Service loads Customer-123, actual sequenceNumber = 5. Matches expectedSequenceNumber.
    • Generates CustomerNameChanged event with sequenceNumber = 6.
    • Attempts to save event(s) to events_store_t (and outbox_messages). Succeeds.
  4. User B sends UpdateCustomerProfileCommand(customerId="123", newAddress="456 Oak", expectedSequenceNumber=5).
    • App Service loads Customer-123. It now replays events up to sequenceNumber = 6. So, actualSequenceNumber = 6.
    • It compares command.expectedSequenceNumber=5 with customer.actualSequenceNumber=6. They do NOT match!
    • The CustomerApplicationService throws a ConcurrencyException.
    • The transaction is rolled back, and no events are written from User B’s command.

Java Implementation Changes

Let’s modify the previous CustomerApplicationService and add a way to load the aggregate from events.

1. Customer Aggregate (Revised)

// domain/Customer.java (Revised)
package com.example.eventoutbox.domain;

import com.example.eventoutbox.domain.events.CustomerAddressChanged;
import com.example.eventoutbox.domain.events.CustomerNameChanged;
import com.example.eventoutbox.domain.events.DomainEvent;
import lombok.Getter;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@Getter
public class Customer {
    private final String customerId;
    private String name;
    private String address;
    private long version; // This is the 'sequenceNumber' of the LAST applied event

    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Constructor for creating a new aggregate
    public Customer(String customerId) {
        this.customerId = customerId;
        this.version = 0; // New aggregates start at version 0
    }

    // Static factory method to load an aggregate from its events
    public static Customer loadFromEvents(String customerId, List<DomainEvent> history) {
        Customer customer = new Customer(customerId);
        history.forEach(customer::applyEvent); // Apply each historical event
        return customer;
    }

    // Method to apply an event to the aggregate's state
    private void applyEvent(DomainEvent event) {
        // This is where you would update the aggregate's internal state
        // based on the specific event type.
        if (event instanceof CustomerNameChanged nameChanged) {
            this.name = nameChanged.getNewName();
        } else if (event instanceof CustomerAddressChanged addressChanged) {
            this.address = addressChanged.getNewAddress();
        }
        this.version = event.getSequenceNumber(); // Update version to the sequence number of the applied event
    }

    // Domain behavior methods that generate new events
    public void changeName(String newName) {
        if (!newName.equals(this.name)) {
            // New events get the *next* sequence number
            long nextSequence = this.version + 1;
            CustomerNameChanged event = new CustomerNameChanged(UUID.randomUUID(), Instant.now(), customerId, nextSequence, newName);
            uncommittedEvents.add(event);
            applyEvent(event); // Apply immediately to current state for consistency
        }
    }

    public void changeAddress(String newAddress) {
        if (!newAddress.equals(this.address)) {
            long nextSequence = this.version + 1;
            CustomerAddressChanged event = new CustomerAddressChanged(UUID.randomUUID(), Instant.now(), customerId, nextSequence, newAddress);
            uncommittedEvents.add(event);
            applyEvent(event);
        }
    }

    public void markEventsCommitted() {
        this.uncommittedEvents.clear();
    }
}

2. ConcurrencyException

// domain/ConcurrencyException.java
package com.example.eventoutbox.domain;

public class ConcurrencyException extends RuntimeException {
    public ConcurrencyException(String message) {
        super(message);
    }
}

3. CustomerApplicationService (Revised)

// application/CustomerApplicationService.java (Revised)
package com.example.eventoutbox.application;

import com.example.eventoutbox.domain.ConcurrencyException;
import com.example.eventoutbox.domain.Customer;
import com.example.eventoutbox.domain.events.DomainEvent;
import com.example.eventoutbox.infrastructure.persistence.eventstore.EventStoreEvent;
import com.example.eventoutbox.infrastructure.persistence.eventstore.EventStoreEventRepository;
import com.example.eventoutbox.infrastructure.persistence.outbox.OutboxMessage;
import com.example.eventoutbox.infrastructure.persistence.outbox.OutboxMessageRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class CustomerApplicationService {

    private final OutboxMessageRepository outboxMessageRepository;
    private final EventStoreEventRepository eventStoreEventRepository;
    private final ObjectMapper objectMapper;

    // Command now includes expectedVersion
    public record UpdateCustomerProfileCommand(String customerId, String newName, String newAddress, long expectedVersion) {}

    @Transactional
    public void updateCustomerProfile(UpdateCustomerProfileCommand command) {
        // --- 1. Load Aggregate State ---
        List<EventStoreEvent> historicalEvents = eventStoreEventRepository.findByAggregateIdOrderBySequenceNumberAsc(command.customerId());

        Customer customer;
        if (historicalEvents.isEmpty()) {
            customer = new Customer(command.customerId());
            // If it's a new aggregate, expectedVersion must be 0
            if (command.expectedVersion() != 0) {
                 throw new ConcurrencyException("Customer with ID " + command.customerId() + " does not exist or expected version is incorrect.");
            }
        } else {
            // Deserialize historical events to DomainEvent objects
            List<DomainEvent> domainEventsHistory = historicalEvents.stream()
                .map(this::deserializeEventStoreEvent)
                .collect(Collectors.toList());
            customer = Customer.loadFromEvents(command.customerId(), domainEventsHistory);

            // --- 2. OPTIMISTIC CONCURRENCY CHECK ---
            if (customer.getVersion() != command.expectedVersion()) {
                throw new ConcurrencyException(
                    "Customer with ID " + command.customerId() + " has been updated by another user. " +
                    "Expected version " + command.expectedVersion() + " but found " + customer.getVersion() + "."
                );
            }
        }

        // --- 3. Apply Business Logic & Generate Events ---
        if (command.newName() != null) {
            customer.changeName(command.newName());
        }
        if (command.newAddress() != null) {
            customer.changeAddress(command.newAddress());
        }

        // --- 4. Persist Events to Event Store & Outbox (Atomically) ---
        List<DomainEvent> eventsToStore = customer.getUncommittedEvents();
        if (eventsToStore.isEmpty()) {
            return; // No changes, no events to publish
        }

        try {
            List<EventStoreEvent> eventStoreEntities = eventsToStore.stream()
                .map(this::mapToEventStoreEvent)
                .collect(Collectors.toList());
            eventStoreEventRepository.saveAll(eventStoreEntities);

            List<OutboxMessage> outboxMessages = eventsToStore.stream()
                .map(this::mapToOutboxMessage)
                .collect(Collectors.toList());
            outboxMessageRepository.saveAll(outboxMessages);

            customer.markEventsCommitted();
        } catch (DataIntegrityViolationException e) {
            // This catches the UNIQUE constraint violation on (aggregate_id, sequence_number)
            // This means another transaction has just written to this aggregate
            throw new ConcurrencyException(
                "Another concurrent update detected for customer " + command.customerId() + ". " +
                "Please refresh and try again.", e
            );
        } catch (IOException e) {
            throw new RuntimeException("Failed to serialize event to JSON", e);
        }
    }

    // Helper methods for mapping/deserializing (similar to before)
    private OutboxMessage mapToOutboxMessage(DomainEvent event) {
        try {
            return OutboxMessage.builder()
                .id(event.getEventId())
                .aggregateId(event.getAggregateId())
                .aggregateType(event.getAggregateType())
                .eventType(event.getEventType())
                .timestamp(event.getTimestamp())
                .payload(objectMapper.writeValueAsString(event))
                .metadata(null)
                .build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event to JSON: " + event.getEventId(), e);
        }
    }

    private EventStoreEvent mapToEventStoreEvent(DomainEvent event) {
        try {
            return EventStoreEvent.builder()
                .id(event.getEventId())
                .aggregateId(event.getAggregateId())
                .aggregateType(event.getAggregateType())
                .eventType(event.getEventType())
                .timestamp(event.getTimestamp())
                .sequenceNumber(event.getSequenceNumber())
                .payload(objectMapper.writeValueAsString(event))
                .metadata(null)
                .build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event to JSON: " + event.getEventId(), e);
        }
    }

    private DomainEvent deserializeEventStoreEvent(EventStoreEvent eventStoreEvent) {
        try {
            // Assuming your event JSON includes the 'eventType' field for polymorphic deserialization
            return objectMapper.readValue(eventStoreEvent.getPayload(), DomainEvent.class);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to deserialize event: " + eventStoreEvent.getId(), e);
        }
    }
}

4. EventStoreEventRepository (Add find method)

// infrastructure/persistence/eventstore/EventStoreEventRepository.java (Revised)
package com.example.eventoutbox.infrastructure.persistence.eventstore;

import org.springframework.data.jpa.repository.JpaRepository;

import java.util.List;
import java.util.UUID;

public interface EventStoreEventRepository extends JpaRepository<EventStoreEvent, UUID> {
    List<EventStoreEvent> findByAggregateIdOrderBySequenceNumberAsc(String aggregateId);
}

5. CustomerController (Handle Exception)

// application/CustomerController.java (Revised)
package com.example.eventoutbox.application;

import com.example.eventoutbox.domain.ConcurrencyException;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/customers")
@RequiredArgsConstructor
public class CustomerController {

    private final CustomerApplicationService customerApplicationService;

    public record UpdateCustomerProfileRequest(String customerId, String newName, String newAddress, long expectedVersion) {}

    @PostMapping("/profile")
    public ResponseEntity<String> updateCustomerProfile(@RequestBody UpdateCustomerProfileRequest request) {
        CustomerApplicationService.UpdateCustomerProfileCommand command =
            new CustomerApplicationService.UpdateCustomerProfileCommand(
                request.customerId(), request.newName(), request.newAddress(), request.expectedVersion()
            );
        customerApplicationService.updateCustomerProfile(command);
        return ResponseEntity.ok("Customer profile update command received and processed.");
    }

    @ExceptionHandler(ConcurrencyException.class)
    public ResponseEntity<String> handleConcurrencyException(ConcurrencyException ex) {
        return ResponseEntity.status(HttpStatus.CONFLICT).body(ex.getMessage());
    }
}

How to Handle Concurrency Conflicts on the Client/UI Side:

When ConcurrencyException is thrown:

  1. Inform the User: Display a message like “This item has been updated by another user. Please refresh the page to see the latest changes and try your update again.”
  2. Retry (less common for user-facing, but possible for background jobs): For non-interactive or automated processes, you might implement a retry mechanism. This retry would need to:
    • Fetch the latest state of the aggregate from a read model.
    • Re-create the command based on the original intent and the newly fetched expected version.
    • Re-send the command.
    • This is typically only done if the change is “safe” to re-apply (e.g., adding an item, not changing a specific value).

By combining the version check in your application service with the UNIQUE constraint in your database, you create a robust optimistic concurrency control mechanism that prevents lost updates effectively.

What if event consumer fails to apply an event to its read model

In this case, the read model becomes stale, and subsequent attempts to update based on that stale data will lead to conflicts.

Let’s break down the scenario and the robust solution.


The Problem Scenario (as you described)

  1. UI: Queries entity_t table (read model), gets Entity (aggregate_version = 5).
  2. User: Makes changes.
  3. UI: Sends UpdateCommand (..., expectedVersion = 5) to the write model.
  4. Write Model (Command Handler):
    • Loads aggregate from event_store_t. Let’s say its actualVersion is 5.
    • OCC Check: actualVersion (5) == expectedVersion (5). Success.
    • Generates Event (..., sequence_number = 6).
    • Persists Event (..., sequence_number = 6) to event_store_t and outbox_message_t in an ACID transaction. This commits version 6 to the event_store_t.
    • Debezium publishes this event to Kafka.
  5. Kafka Consumer (PortalEventConsumer):
    • Reads Event (..., sequence_number = 6, expectedVersion = 5).
    • Tries to update entity_t (your read model): UPDATE entity_t SET ..., aggregate_version = 6 WHERE entity_id = ? AND aggregate_version = 5.
    • FAILURE: An exception occurs in the database update (e.g., a network error, a constraint violation unrelated to aggregate_version, or the consumer’s JVM crashes).
    • Result: The entity_t table is NOT updated and remains at aggregate_version = 5. The event_store_t is at aggregate_version = 6. The read model is now stale.
  6. Next UI interaction:
    • UI queries entity_t again. It still gets Entity (aggregate_version = 5) because the read model is stale.
    • UI sends UpdateCommand (..., expectedVersion = 5).
  7. Write Model (Command Handler) - Second Attempt:
    • Loads aggregate from event_store_t. Its actualVersion is 6.
    • OCC Check: actualVersion (6) != expectedVersion (5). Conflict detected!
    • Result: The command handler throws a ConcurrencyException. It does NOT try to insert a new event into event_store_t with sequence_number=6 (because that would be a duplicate and would indeed fail on the unique constraint). It correctly rejects the command.

The specific symptom you mentioned (“new event insert into the event_store_t and it will fail because the aggregate version is used before”) should ideally not happen if the write model correctly detects OCC. The ConcurrencyException should prevent the duplicate event generation.

The core problem, then, is stale read models due to consumer processing failures, which then lead to ConcurrencyException at the write model.


The Solution: Robust Kafka Consumer Processing (Retry & DLQ)

The solution lies entirely within your Kafka Consumer’s (PortalEventConsumerStartupHook) error handling strategy.

Your most recent incremental code includes the processSingleEventWithRetries method with retry and DLQ logic. This is precisely the mechanism designed to handle this situation.

Here’s how it’s supposed to work and what you need to ensure is functioning correctly:

  1. Idempotency of Read Model Updates:

    • All your dbProvider.createXxx, updateXxx, deleteXxx methods (e.g., updateRole, deleteRole, createRole) must be idempotent in their database effects.
    • For UPDATE and DELETE, WHERE aggregate_version = expectedVersion makes them idempotent. If the update was already applied (or a newer version is present), 0 rows affected means no harm done (though it might still trigger a ConcurrencyException within the consumer’s dbProvider methods if you implement the record-not-found-vs-conflict check).
    • For INSERT, use INSERT ... ON CONFLICT (primary_key) DO UPDATE SET aggregate_version = excluded.aggregate_version, ... (UPSERT) if the “create” event might be re-delivered and you expect it to update an existing record (e.g., in a snapshot table). Otherwise, if it’s strictly a “create-only” and a duplicate PK is a bug, the SQLException for unique constraint violation is correct.
  2. Consumer’s Retry/DLQ Logic (The core fix): The processSingleEventWithRetries method is crucial.

    • Transient Errors:

      • If dbProvider.updateXxx (or any other part of processSingleEventWithRetries) throws a transient SQLException (e.g., connection timeout, deadlock), the currentRetry is incremented, and Thread.sleep occurs.
      • If maxRetries is not exhausted, processSingleEventWithRetries will return false.
      • The onCompletion loop will then break; (meaning it won’t commitSync() any offsets for this batch).
      • On the next readRecords call, the entire batch (including the transiently failed record) will be re-polled and re-processed. This relies on idempotency.
    • Permanent Errors:

      • If dbProvider.updateXxx throws a DbProvider.ConcurrencyException (meaning the read model’s version was stale, so the WHERE aggregate_version = expectedVersion update in the consumer failed with 0 rows, but the record did exist at a higher version) or an IllegalArgumentException (bad data) or a permanent SQLException (e.g., unique constraint violation on an INSERT where it shouldn’t happen, or foreign key constraint violation):
        • processSingleEventWithRetries will catch it and call handlePermanentFailure.
        • handlePermanentFailure sends the original Kafka record to the DLQ.
        • processSingleEventWithRetries then returns true (because the event has been “handled” by being DLQ’d).
        • onCompletion then does include this record’s offset in offsetsToCommit and proceeds to commitSync() for the batch.
        • Result: The consumer makes progress past this “poison pill.” The stale event in entity_t is not updated by this specific event, but the consumer doesn’t get stuck.

How to Handle the Stale UI Problem

Once the consumer’s retry/DLQ is robust, the stale UI becomes a UX problem rather than a system consistency problem.

  1. Producer’s ConcurrencyException is Key: When the UI sends UpdateCommand(..., expectedVersion = 5) and the event_store_t is already at version 6, the write model will throw ConcurrencyException. This is the correct behavior.

  2. UI Response to ConcurrencyException:

    • The UI must catch this ConcurrencyException from the API.
    • It should display a clear message to the user: “This record has been updated by another user. Please refresh the page to see the latest changes and re-apply your modifications if desired.”
    • It should then trigger a refresh of the data from the read model, which will then fetch Entity (aggregate_version = 6) (assuming the consumer eventually caught up or the record was DLQ’d and a manual fix was applied).

Summary and What to Focus On:

  1. Ensure processSingleEventWithRetries is fully correct and deployed. This is your primary defense against the consumer getting stuck.
  2. Carefully verify the isTransientSqlError logic. It must accurately identify only recoverable database errors. Misclassifying permanent errors as transient will exhaust retries and then DLQ; misclassifying transient as permanent sends to DLQ too soon.
  3. Confirm handlePermanentFailure correctly sends to DLQ and logs.
  4. Confirm your dbProvider methods are truly idempotent for re-delivered messages. UPDATE ... WHERE aggregate_version = expectedVersion is idempotent. INSERTs are trickier.
  5. Educate your UI/API layer: They must handle the ConcurrencyException from the write model by forcing a refresh and user notification.

Your recent changes to PortalEventConsumerStartupHook already implement this retry/DLQ pattern. The next step is to ensure that code is compilable, deployed correctly, and that the dbProvider methods are fully integrated with the aggregate_version for idempotency and OCC checking as discussed in the previous detailed answer.

What if DLQ messages are not handled in a timely fashion

This is a critical operational risk with the Dead Letter Queue (DLQ) pattern: DLQ messages are “handled exceptions,” not “disappearing problems.” If they aren’t processed, they represent real, unapplied business facts that can lead to data inconsistencies and broken business processes over time.

If a message sits in the DLQ for 30 days, your read models will be stale, your UIs will report incorrect data, and downstream systems relying on that information will also be out of sync. This can severely damage data integrity and user trust.


The DLQ is a “Hospital” or “Quarantine Zone,” Not a “Graveyard”

It’s a place for messages that need human intervention or a specific, non-automated re-driving process. It’s not a place for messages to just die.

Strategies to Prevent DLQ Message Stagnation

To ensure DLQ messages are handled in a timely fashion, you need a robust DLQ management strategy that goes beyond just pushing messages to the topic.

1. Robust Monitoring & Alerting (Immediate Action)
  • Metric: Count of messages in DLQ topics (kafka_topic_partition_current_offset, kafka_consumer_group_lag, or custom JMX metrics).
  • Alerting Thresholds:
    • Urgent: Alert immediately (PagerDuty, Slack, SMS) if the number of messages in any DLQ topic goes above 0 or a very small threshold (e.g., 5-10 messages). A DLQ is an exceptional queue.
    • Warning: Alert if messages persist for a certain duration (e.g., 1 hour, 4 hours).
  • Dashboards: Create a dashboard that prominently displays the number of messages in each DLQ topic and their age.
2. Clear Ownership & Standard Operating Procedures (SOPs)
  • Who owns the DLQ? Assign clear responsibility to a specific team (e.g., SRE, Development team for that microservice).
  • What’s the process? Define a clear SOP for handling DLQ alerts:
    1. Acknowledge alert.
    2. Inspect the DLQ message content (payload, error message, original topic/offset).
    3. Identify the root cause (code bug, malformed data, transient external system outage, business process error).
    4. Decide on action:
      • Fix Code/Data: If it’s a bug, deploy a fix. If it’s bad data, decide if it needs manual correction in the database or if upstream data entry needs fixing.
      • Re-drive: After fixing the root cause, re-drive the message(s) back to the original topic.
      • Discard (Rare & Documented): Only if the message is truly unrecoverable garbage or a test message that accidentally ended up there, and its impact is negligible. This decision must be audited and requires strong justification.
3. Automated DLQ Re-driving with Human Trigger (Operational Playbook)
  • You’ll need a “re-driver” tool/application.
  • Purpose: This tool reads messages from the DLQ, and publishes them back to their original topic for re-processing.
  • Features:
    • Preview: Show content of DLQ messages before re-driving.
    • Selectivity: Allow re-driving specific messages, or ranges of messages.
    • Filtering: Filter by error type, timestamp, etc.
    • Audit: Log who re-drove what message.
  • Integration:
    • Could be a simple command-line tool.
    • Could be integrated into your internal developer portal or ops dashboard.
    • Could be a scheduled job that runs periodically but requires explicit human approval before actually publishing.
4. Automated Retries (Beyond Initial Consumer)

For certain classes of “permanent-but-maybe-not-really” errors (e.g., external API rate limits, very long-running external process), you could have a separate, simpler consumer that specifically subscribes to the DLQ.

  • Purpose: This DLQ consumer would only handle a very specific, narrow class of DLQ messages.
  • Logic: It would apply its own retry logic (e.g., exponential backoff for a longer period, up to 24 hours).
  • Re-DLQ: If this DLQ consumer also fails after its retries, it would push the message back to the same DLQ topic (or a different, truly “unresolvable” DLQ) to re-trigger human intervention.
  • Caution: This adds complexity and should only be done for errors you’ve explicitly identified as potentially auto-recoverable over a very long time.
5. Data Reconciliation (Long-Term Strategy)
  • For critical business data, even with DLQs, sometimes manual reconciliation or a dedicated reconciliation service might be needed.
  • This involves periodically comparing your read model state with the authoritative Event Store (or an authoritative external system) to detect and resolve discrepancies that might have slipped through.

Addressing “Most errors cannot be recovered anyway”

If “most errors cannot be recovered anyway” and always lead to DLQ, it points to a need for:

  1. Better isTransientSqlError logic: Are you correctly identifying all transient SQL errors that could be retried by the main consumer?
  2. More robust validation upstream: Can you prevent more malformed/invalid data from even generating events that will fail downstream? (E.g., more validation in the command handler).
  3. Refined Domain Model/Business Rules: Sometimes, a “permanent” error in the consumer points to a business rule that isn’t fully handled by events, or a valid state transition that wasn’t expected.

DLQs are not a substitute for preventing errors, but they are essential for graceful failure and ensuring eventual consistency. Their effectiveness hinges entirely on the operational discipline and tooling around them.

How to handle the scenario that it takes weeks or even months to fix a bug and get it deployed

This is an issue regarding a major reality of enterprise software: deployment cycles can be slow, and manual intervention for DLQs simply isn’t scalable for a continuous stream of errors. This is precisely why a purely “human-in-the-loop” DLQ management strategy breaks down in a corporate context with persistent bugs.

To handle a bug that takes weeks/months to deploy, you need a strategy that focuses on containment, bypass, and eventually, reconciliation.


Strategy for Long-Deployment-Cycle Bugs (Weeks/Months)

The goal is to stop the errors from piling up indefinitely in the DLQ, and to allow the rest of the system to continue processing, even if some data is temporarily inconsistent.

1. Immediate Containment: Filtering or Skipping “Poison Pill” Messages
  • Implement a “Hot Fix” Filter (Code-based or SMT-based):

    • In your Kafka Consumer (PortalEventConsumerStartupHook): If you identify a bug where a specific type of event (or event with specific data) consistently causes failures:
      • Add a temporary code filter. For instance, if ScheduleCreatedEvent with null userId is causing NullPointerException, add:
        if (eventType.equals(PortalConstants.SCHEDULE_CREATED_EVENT) && eventMap.get("userId") == null) {
            logger.warn("Skipping known bug event type {} for record {} due to null userId. Not processing.", eventType, record.offset());
            handlePermanentFailure(record, "Known bug: null userId for " + eventType, "KnownBugSkip");
            return true; // Mark as handled (DLQ'd), commit offset, move on.
        }
        
      • If the bug is in a specific dbProvider method: You can wrap that call in a try-catch for PermanentProcessingException specifically for that event type, and if it’s the known bug, send it to DLQ and commit.
    • Using Kafka Connect SMT (if source is Kafka Connect): You could implement a custom Filter SMT that drops/routes specific problematic messages before they even hit your consumer app. This requires deploying a new SMT, but it can be faster than an app deployment.
  • Why: This immediately stops the DLQ from growing uncontrollably with known bad messages. It sacrifices processing that specific message but ensures the consumer stays healthy.

2. Automated (Limited) Re-driving for Transient/Known Issues (Or Triage)
  • “Error Triage” Consumer: Instead of just sending to a single DLQ, consider a dedicated consumer that subscribes to your main DLQ topic.
    • This consumer acts as an automated triage.
    • It checks the errorType (from handlePermanentFailure’s metadata).
    • If errorType is “TransientSqlError” or “RetriesExhausted” (but could eventually succeed): It re-publishes the original message back to the portal-event topic with an exponential backoff. It might implement its own max retries (e.g., 50 retries over 24 hours). If it still fails, then it pushes to a “Final DLQ” that truly requires manual intervention.
    • If errorType is “ConcurrencyConflict”, “DataValidationError”, “UnhandledEventType”, or “KnownBugSkip”: It pushes to a separate “Permanent DLQ” topic. This queue is smaller and truly requires human eyes.
  • Why: This handles messages that might eventually self-resolve or that you know can’t be fixed by immediate retries but aren’t necessarily “dead forever.” It reduces the volume of messages requiring immediate human attention.
3. Manual Intervention for “Permanent DLQ” / Complex Bugs (When Devs Get Involved)
  • The “Permanent DLQ” is where true bugs/bad data sit.
  • The same monitoring and alerting from before applies, but now it’s for a much smaller, higher-priority queue.
  • Developers must actively:
    • Analyze: What exactly caused this? Why did it bypass automated retries/filters?
    • Fix: Develop and deploy the bug fix.
    • Reconcile/Re-drive:
      • If the bug fix resolves the issue, use a re-driver tool to re-submit messages from the Permanent DLQ to the portal-event topic.
      • If the bug resulted in data inconsistencies that can’t be fixed by re-driving (e.g., a critical business state was violated), you might need to perform a manual database correction on the affected aggregate(s) (this is the most dangerous and should be avoided if possible).
4. Long-Term Data Reconciliation / Auditing
  • Offline Reconciliation: For critical data, implement daily/weekly batch jobs that compare the state of your read model tables with the authoritative Event Store.
    • If discrepancies are found, they are reported, and a reconciliation process is triggered (either manual or automated). This ensures that even if events were missed or misapplied, data consistency is eventually achieved.
  • Event Replay (When all else fails): If a significant bug causes widespread data corruption or loss of consistency, the ultimate fallback is to:
    1. Deploy the bug fix.
    2. Stop the affected read model consumer.
    3. Clear the affected read model tables.
    4. Replay all historical events from the event_store_t (or long-retention Kafka topics) through the fixed consumer logic. This rebuilds the read model from scratch, reflecting the correct business logic. This is why Event Sourcing is so powerful.

Example Workflow with a Long-Deployment-Cycle Bug

  1. Bug Identified: ScheduleCreatedEvent creates a schedule, but due to a bug in the consumer’s dbProvider.createSchedule method, it tries to insert a duplicate primary key if scheduleId (aggregate ID) exists, and this causes a permanent error in the consumer.
  2. Immediate Containment (Filter/Bypass):
    • A hotfix is applied to the PortalEventConsumerStartupHook (or a dedicated filter SMT) to recognize ScheduleCreatedEvent where scheduleId already exists.
    • For such events, it handlePermanentFailure() the message to a portal-event-dlq-permanent topic (or a KnownBugDLQ). This prevents the main consumer from getting stuck.
  3. DLQ Accumulation & Monitoring: Messages related to this bug pile up in portal-event-dlq-permanent. Alerts are firing.
  4. Development Fix: The development team works on a fix for dbProvider.createSchedule (e.g., changing it to an UPSERT if a “create” event implies “idempotent create/update” or better handling of duplicate primary keys if it truly is an error). This takes weeks.
  5. Deployment: The fix is deployed.
  6. Re-driving: Operations team uses the re-driver tool to:
    • Read messages from portal-event-dlq-permanent.
    • Publish them back to the portal-event topic.
    • The now-fixed consumer processes them correctly.

This approach balances immediate operational stability with eventual consistency, acknowledging the realities of enterprise deployment cycles.