1
0

M4 AP-006 Persistenzkonsistenz und Bootstrap-Scope korrigieren

This commit is contained in:
2026-04-03 08:37:44 +02:00
parent fc30d1effd
commit d61299f892
22 changed files with 491 additions and 174 deletions

View File

@@ -0,0 +1,35 @@
package de.gecheckt.pdf.umbenenner.application.port.out;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
import java.util.function.Consumer;
/**
* Port for executing multiple repository operations within a single unit of work.
* <p>
* Ensures that related persistence operations (such as saving a processing attempt
* and updating a document record) are executed atomically.
*
* @since M4-AP-006-fix
*/
public interface UnitOfWorkPort {
/**
* Executes the given operations within a single unit of work.
* <p>
* If any operation fails, all changes are rolled back and the exception is propagated.
*
* @param operations the operations to execute; must not be null
* @throws DocumentPersistenceException if any operation fails
*/
void executeInTransaction(Consumer<TransactionOperations> operations);
/**
* Operations available within a transaction.
*/
interface TransactionOperations {
void saveProcessingAttempt(ProcessingAttempt attempt);
void createDocumentRecord(DocumentRecord record);
void updateDocumentRecord(DocumentRecord record);
}
}

View File

@@ -12,6 +12,7 @@ import de.gecheckt.pdf.umbenenner.application.port.out.FailureCounters;
import de.gecheckt.pdf.umbenenner.application.port.out.PersistenceLookupTechnicalFailure;
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttempt;
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttemptRepository;
import de.gecheckt.pdf.umbenenner.application.port.out.UnitOfWorkPort;
import de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome;
@@ -66,12 +67,9 @@ import java.util.Objects;
* <h2>Persistence consistency</h2>
* <p>
* For every identified document, both the processing attempt and the master record are
* written in sequence. If either write fails, the failure is logged and the batch run
* continues with the next candidate. No partial state is intentionally left; if the
* attempt write succeeds but the master record write fails, the inconsistency is bounded
* to that one document and is logged clearly. True transactionality across two separate
* repository calls is not available without a larger architectural change; this is
* documented as a known limitation of the M4 scope.
* written atomically using a unit of work pattern. If either write fails, both writes
* are rolled back and the failure is logged. The batch run continues with the next
* candidate.
*
* <h2>Pre-fingerprint failures</h2>
* <p>
@@ -87,6 +85,7 @@ public class M4DocumentProcessor {
private final DocumentRecordRepository documentRecordRepository;
private final ProcessingAttemptRepository processingAttemptRepository;
private final UnitOfWorkPort unitOfWorkPort;
/**
* Creates the M4 document processor with the required persistence ports.
@@ -95,15 +94,20 @@ public class M4DocumentProcessor {
* must not be null
* @param processingAttemptRepository port for writing and reading the attempt history;
* must not be null
* @param unitOfWorkPort port for executing operations atomically;
* must not be null
* @throws NullPointerException if any parameter is null
*/
public M4DocumentProcessor(
DocumentRecordRepository documentRecordRepository,
ProcessingAttemptRepository processingAttemptRepository) {
ProcessingAttemptRepository processingAttemptRepository,
UnitOfWorkPort unitOfWorkPort) {
this.documentRecordRepository =
Objects.requireNonNull(documentRecordRepository, "documentRecordRepository must not be null");
this.processingAttemptRepository =
Objects.requireNonNull(processingAttemptRepository, "processingAttemptRepository must not be null");
this.unitOfWorkPort =
Objects.requireNonNull(unitOfWorkPort, "unitOfWorkPort must not be null");
}
/**
@@ -329,22 +333,24 @@ public class M4DocumentProcessor {
false // not retryable
);
// Write attempt first, then update master record
processingAttemptRepository.save(skipAttempt);
// Write attempt and master record atomically
unitOfWorkPort.executeInTransaction(txOps -> {
txOps.saveProcessingAttempt(skipAttempt);
// Update master record: only updatedAt changes; status and counters stay the same
DocumentRecord updatedRecord = new DocumentRecord(
existingRecord.fingerprint(),
new SourceDocumentLocator(candidate.locator().value()),
candidate.uniqueIdentifier(),
existingRecord.overallStatus(), // terminal status unchanged
existingRecord.failureCounters(), // counters unchanged for skip
existingRecord.lastFailureInstant(),
existingRecord.lastSuccessInstant(),
existingRecord.createdAt(),
now // updatedAt = now
);
documentRecordRepository.update(updatedRecord);
// Update master record: only updatedAt changes; status and counters stay the same
DocumentRecord updatedRecord = new DocumentRecord(
existingRecord.fingerprint(),
new SourceDocumentLocator(candidate.locator().value()),
candidate.uniqueIdentifier(),
existingRecord.overallStatus(), // terminal status unchanged
existingRecord.failureCounters(), // counters unchanged for skip
existingRecord.lastFailureInstant(),
existingRecord.lastSuccessInstant(),
existingRecord.createdAt(),
now // updatedAt = now
);
txOps.updateDocumentRecord(updatedRecord);
});
LOG.debug("Skip attempt #{} persisted for '{}' with status {}.",
attemptNumber, candidate.uniqueIdentifier(), skipStatus);
@@ -401,9 +407,11 @@ public class M4DocumentProcessor {
now // updatedAt
);
// Persist attempt first, then master record
processingAttemptRepository.save(attempt);
documentRecordRepository.create(newRecord);
// Persist attempt and master record atomically
unitOfWorkPort.executeInTransaction(txOps -> {
txOps.saveProcessingAttempt(attempt);
txOps.createDocumentRecord(newRecord);
});
LOG.info("New document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
candidate.uniqueIdentifier(),
@@ -466,9 +474,11 @@ public class M4DocumentProcessor {
now // updatedAt
);
// Persist attempt first, then master record
processingAttemptRepository.save(attempt);
documentRecordRepository.update(updatedRecord);
// Persist attempt and master record atomically
unitOfWorkPort.executeInTransaction(txOps -> {
txOps.saveProcessingAttempt(attempt);
txOps.updateDocumentRecord(updatedRecord);
});
LOG.info("Known document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
candidate.uniqueIdentifier(),

View File

@@ -12,6 +12,7 @@ import de.gecheckt.pdf.umbenenner.application.port.out.FailureCounters;
import de.gecheckt.pdf.umbenenner.application.port.out.PersistenceLookupTechnicalFailure;
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttempt;
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttemptRepository;
import de.gecheckt.pdf.umbenenner.application.port.out.UnitOfWorkPort;
import de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome;
@@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.*;
@@ -56,6 +58,7 @@ class M4DocumentProcessorTest {
private CapturingDocumentRecordRepository recordRepo;
private CapturingProcessingAttemptRepository attemptRepo;
private CapturingUnitOfWorkPort unitOfWorkPort;
private M4DocumentProcessor processor;
private SourceDocumentCandidate candidate;
@@ -67,7 +70,8 @@ class M4DocumentProcessorTest {
void setUp() {
recordRepo = new CapturingDocumentRecordRepository();
attemptRepo = new CapturingProcessingAttemptRepository();
processor = new M4DocumentProcessor(recordRepo, attemptRepo);
unitOfWorkPort = new CapturingUnitOfWorkPort(recordRepo, attemptRepo);
processor = new M4DocumentProcessor(recordRepo, attemptRepo, unitOfWorkPort);
candidate = new SourceDocumentCandidate(
"test.pdf", 1024L, new SourceDocumentLocator("/tmp/test.pdf"));
@@ -321,8 +325,8 @@ class M4DocumentProcessorTest {
@Test
void process_persistenceWriteFailure_doesNotThrow_batchContinues() {
recordRepo.setLookupResult(new DocumentUnknown());
// Make the attempt save throw
attemptRepo.failOnSave = true;
// Make the unit of work throw
unitOfWorkPort.failOnExecute = true;
DocumentProcessingOutcome m3Outcome = new PreCheckPassed(
candidate, new PdfExtractionSuccess("text", new PdfPageCount(1)));
@@ -422,4 +426,45 @@ class M4DocumentProcessorTest {
return List.copyOf(savedAttempts);
}
}
private static class CapturingUnitOfWorkPort implements UnitOfWorkPort {
private final CapturingDocumentRecordRepository recordRepo;
private final CapturingProcessingAttemptRepository attemptRepo;
boolean failOnExecute = false;
Consumer<TransactionOperations> lastOperations = null;
CapturingUnitOfWorkPort(CapturingDocumentRecordRepository recordRepo,
CapturingProcessingAttemptRepository attemptRepo) {
this.recordRepo = recordRepo;
this.attemptRepo = attemptRepo;
}
@Override
public void executeInTransaction(Consumer<TransactionOperations> operations) {
this.lastOperations = operations;
if (failOnExecute) {
throw new DocumentPersistenceException("Simulated transaction failure");
}
// Execute the operations with mock transaction operations that delegate to repos
TransactionOperations mockOps = new TransactionOperations() {
@Override
public void saveProcessingAttempt(ProcessingAttempt attempt) {
attemptRepo.savedAttempts.add(attempt);
}
@Override
public void createDocumentRecord(DocumentRecord record) {
recordRepo.createdRecords.add(record);
}
@Override
public void updateDocumentRecord(DocumentRecord record) {
recordRepo.updatedRecords.add(record);
}
};
operations.accept(mockOps);
}
}
}

View File

@@ -17,6 +17,7 @@ import de.gecheckt.pdf.umbenenner.application.port.out.RunLockPort;
import de.gecheckt.pdf.umbenenner.application.port.out.RunLockUnavailableException;
import de.gecheckt.pdf.umbenenner.application.port.out.SourceDocumentAccessException;
import de.gecheckt.pdf.umbenenner.application.port.out.SourceDocumentCandidatesPort;
import de.gecheckt.pdf.umbenenner.application.port.out.UnitOfWorkPort;
import de.gecheckt.pdf.umbenenner.application.service.M4DocumentProcessor;
import de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext;
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
@@ -39,6 +40,7 @@ import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.*;
@@ -615,7 +617,7 @@ class BatchRunProcessingUseCaseTest {
*/
private static class NoOpM4DocumentProcessor extends M4DocumentProcessor {
NoOpM4DocumentProcessor() {
super(new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository());
super(new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository(), new NoOpUnitOfWorkPort());
}
}
@@ -626,7 +628,7 @@ class BatchRunProcessingUseCaseTest {
private int processCallCount = 0;
TrackingM4DocumentProcessor() {
super(new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository());
super(new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository(), new NoOpUnitOfWorkPort());
}
@Override
@@ -692,4 +694,28 @@ class BatchRunProcessingUseCaseTest {
return List.of();
}
}
/** No-op UnitOfWorkPort for use in test M4DocumentProcessor instances. */
private static class NoOpUnitOfWorkPort implements UnitOfWorkPort {
@Override
public void executeInTransaction(Consumer<TransactionOperations> operations) {
// No-op - just execute the operations directly without transaction
operations.accept(new TransactionOperations() {
@Override
public void saveProcessingAttempt(ProcessingAttempt attempt) {
// No-op
}
@Override
public void createDocumentRecord(DocumentRecord record) {
// No-op
}
@Override
public void updateDocumentRecord(DocumentRecord record) {
// No-op
}
});
}
}
}