Nachbearbeitung: Dokumentbezogene Persistenzfehler korrekt im
Batch-Ergebnis berücksichtigt
This commit is contained in:
@@ -129,8 +129,10 @@ public class DocumentProcessingCoordinator {
|
||||
* must not be null
|
||||
* @param attemptStart the instant at which processing of this candidate began;
|
||||
* must not be null
|
||||
* @return true if processing and persistence succeeded for this document, false if a
|
||||
* persistence failure occurred
|
||||
*/
|
||||
public void process(
|
||||
public boolean process(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome outcome,
|
||||
@@ -143,7 +145,7 @@ public class DocumentProcessingCoordinator {
|
||||
Objects.requireNonNull(context, "context must not be null");
|
||||
Objects.requireNonNull(attemptStart, "attemptStart must not be null");
|
||||
|
||||
processDeferredOutcome(candidate, fingerprint, context, attemptStart, ignored -> outcome);
|
||||
return processDeferredOutcome(candidate, fingerprint, context, attemptStart, ignored -> outcome);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -172,8 +174,10 @@ public class DocumentProcessingCoordinator {
|
||||
* must not be null
|
||||
* @param pipelineExecutor functional interface that executes the extraction and pre-check
|
||||
* pipeline when needed; must not be null
|
||||
* @return true if processing and persistence succeeded for this document, false if a
|
||||
* persistence failure occurred (lookup, attempt write, or record write)
|
||||
*/
|
||||
public void processDeferredOutcome(
|
||||
public boolean processDeferredOutcome(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
BatchRunContext context,
|
||||
@@ -194,16 +198,16 @@ public class DocumentProcessingCoordinator {
|
||||
if (lookupResult instanceof PersistenceLookupTechnicalFailure failure) {
|
||||
logger.error("Cannot process '{}': master record lookup failed: {}",
|
||||
candidate.uniqueIdentifier(), failure.errorMessage());
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Step 3: Determine the action based on the lookup result
|
||||
switch (lookupResult) {
|
||||
return switch (lookupResult) {
|
||||
case DocumentTerminalSuccess terminalSuccess -> {
|
||||
// Document already successfully processed → skip
|
||||
logger.info("Skipping '{}': already successfully processed (fingerprint: {}).",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
persistSkipAttempt(
|
||||
yield persistSkipAttempt(
|
||||
candidate, fingerprint, terminalSuccess.record(),
|
||||
ProcessingStatus.SKIPPED_ALREADY_PROCESSED,
|
||||
context, attemptStart);
|
||||
@@ -213,7 +217,7 @@ public class DocumentProcessingCoordinator {
|
||||
// Document finally failed → skip
|
||||
logger.info("Skipping '{}': already finally failed (fingerprint: {}).",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
persistSkipAttempt(
|
||||
yield persistSkipAttempt(
|
||||
candidate, fingerprint, terminalFailure.record(),
|
||||
ProcessingStatus.SKIPPED_FINAL_FAILURE,
|
||||
context, attemptStart);
|
||||
@@ -222,22 +226,24 @@ public class DocumentProcessingCoordinator {
|
||||
case DocumentUnknown ignored -> {
|
||||
// New document – execute pipeline and process
|
||||
DocumentProcessingOutcome outcome = pipelineExecutor.apply(candidate);
|
||||
processAndPersistNewDocument(candidate, fingerprint, outcome, context, attemptStart);
|
||||
yield processAndPersistNewDocument(candidate, fingerprint, outcome, context, attemptStart);
|
||||
}
|
||||
|
||||
case DocumentKnownProcessable knownProcessable -> {
|
||||
// Known but not terminal – execute pipeline and process
|
||||
DocumentProcessingOutcome outcome = pipelineExecutor.apply(candidate);
|
||||
processAndPersistKnownDocument(
|
||||
yield processAndPersistKnownDocument(
|
||||
candidate, fingerprint, outcome, knownProcessable.record(),
|
||||
context, attemptStart);
|
||||
}
|
||||
|
||||
default ->
|
||||
default -> {
|
||||
// Exhaustive sealed hierarchy; this branch is unreachable
|
||||
logger.error("Unexpected lookup result type for '{}': {}",
|
||||
candidate.uniqueIdentifier(), lookupResult.getClass().getSimpleName());
|
||||
}
|
||||
yield false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -257,8 +263,9 @@ public class DocumentProcessingCoordinator {
|
||||
* or {@link ProcessingStatus#SKIPPED_FINAL_FAILURE})
|
||||
* @param context the current batch run context
|
||||
* @param attemptStart the start instant of this processing attempt
|
||||
* @return true if persistence succeeded, false if a persistence exception occurred
|
||||
*/
|
||||
private void persistSkipAttempt(
|
||||
private boolean persistSkipAttempt(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentRecord existingRecord,
|
||||
@@ -293,10 +300,12 @@ public class DocumentProcessingCoordinator {
|
||||
|
||||
logger.debug("Skip attempt #{} persisted for '{}' with status {}.",
|
||||
attemptNumber, candidate.uniqueIdentifier(), skipStatus);
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
logger.error("Failed to persist skip attempt for '{}': {}",
|
||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,7 +314,7 @@ public class DocumentProcessingCoordinator {
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/** Maps the pipeline outcome for a new document and persists attempt + new master record. */
|
||||
private void processAndPersistNewDocument(
|
||||
private boolean processAndPersistNewDocument(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome pipelineOutcome,
|
||||
@@ -315,7 +324,7 @@ public class DocumentProcessingCoordinator {
|
||||
Instant now = Instant.now();
|
||||
ProcessingOutcomeTransition.ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome);
|
||||
DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now);
|
||||
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||
return persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||
txOps -> txOps.createDocumentRecord(newRecord));
|
||||
}
|
||||
|
||||
@@ -324,7 +333,7 @@ public class DocumentProcessingCoordinator {
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/** Maps the pipeline outcome for a known document and persists attempt + updated master record. */
|
||||
private void processAndPersistKnownDocument(
|
||||
private boolean processAndPersistKnownDocument(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome pipelineOutcome,
|
||||
@@ -335,7 +344,7 @@ public class DocumentProcessingCoordinator {
|
||||
Instant now = Instant.now();
|
||||
ProcessingOutcomeTransition.ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters());
|
||||
DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now);
|
||||
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||
return persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||
txOps -> txOps.updateDocumentRecord(updatedRecord));
|
||||
}
|
||||
|
||||
@@ -440,8 +449,10 @@ public class DocumentProcessingCoordinator {
|
||||
* {@code recordWriter} performs either {@code createDocumentRecord} or
|
||||
* {@code updateDocumentRecord} depending on whether the document is new or known.
|
||||
* All persistence failures are caught and logged; the batch run continues.
|
||||
*
|
||||
* @return true if persistence succeeded, false if a persistence exception occurred
|
||||
*/
|
||||
private void persistAttemptAndRecord(
|
||||
private boolean persistAttemptAndRecord(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
BatchRunContext context,
|
||||
@@ -465,10 +476,12 @@ public class DocumentProcessingCoordinator {
|
||||
outcome.overallStatus(),
|
||||
outcome.counters().contentErrorCount(),
|
||||
outcome.counters().transientErrorCount());
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
logger.error("Failed to persist processing result for '{}': {}",
|
||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -158,9 +158,14 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
|
||||
/**
|
||||
* Loads candidates and processes them one by one.
|
||||
* <p>
|
||||
* Tracks whether any document-level persistence failures occur during processing.
|
||||
* A persistence failure for a single document causes the overall batch outcome
|
||||
* to be FAILURE instead of SUCCESS.
|
||||
*
|
||||
* @param context the current batch run context
|
||||
* @return SUCCESS if all candidates were processed, FAILURE if source access fails
|
||||
* @return SUCCESS if all candidates were processed without persistence failures,
|
||||
* FAILURE if source access fails or any document-level persistence failure occurred
|
||||
*/
|
||||
private BatchRunOutcome processCandidates(BatchRunContext context) {
|
||||
List<SourceDocumentCandidate> candidates;
|
||||
@@ -172,13 +177,24 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
}
|
||||
logger.info("Found {} PDF candidate(s) in source folder.", candidates.size());
|
||||
|
||||
// Track whether any document-level persistence failures occurred
|
||||
boolean anyPersistenceFailure = false;
|
||||
|
||||
// Process each candidate
|
||||
for (SourceDocumentCandidate candidate : candidates) {
|
||||
processCandidate(candidate, context);
|
||||
if (!processCandidate(candidate, context)) {
|
||||
anyPersistenceFailure = true;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Batch run completed. Processed {} candidate(s). RunId: {}",
|
||||
candidates.size(), context.runId());
|
||||
|
||||
if (anyPersistenceFailure) {
|
||||
logger.warn("Batch run completed with document-level persistence failure(s).");
|
||||
return BatchRunOutcome.FAILURE;
|
||||
}
|
||||
|
||||
return BatchRunOutcome.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -209,7 +225,7 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
* <li>Record the attempt start instant.</li>
|
||||
* <li>Compute the SHA-256 fingerprint of the candidate file content.</li>
|
||||
* <li>If fingerprint computation fails: log as non-identifiable run event and
|
||||
* return — no SQLite record is created.</li>
|
||||
* return true — no SQLite record is created, but no persistence failure occurred.</li>
|
||||
* <li>Load document master record.</li>
|
||||
* <li>If already {@code SUCCESS} → persist skip attempt with
|
||||
* {@code SKIPPED_ALREADY_PROCESSED}.</li>
|
||||
@@ -226,21 +242,23 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
*
|
||||
* @param candidate the candidate to process
|
||||
* @param context the current batch run context
|
||||
* @return true if the candidate was processed without persistence failures (fingerprint
|
||||
* errors return true; persistence failures return false)
|
||||
*/
|
||||
private void processCandidate(SourceDocumentCandidate candidate, BatchRunContext context) {
|
||||
private boolean processCandidate(SourceDocumentCandidate candidate, BatchRunContext context) {
|
||||
logger.debug("Processing candidate: {}", candidate.uniqueIdentifier());
|
||||
|
||||
Instant attemptStart = Instant.now();
|
||||
FingerprintResult fingerprintResult = fingerprintPort.computeFingerprint(candidate);
|
||||
|
||||
switch (fingerprintResult) {
|
||||
return switch (fingerprintResult) {
|
||||
case FingerprintTechnicalError fingerprintError -> {
|
||||
handleFingerprintError(candidate, fingerprintError);
|
||||
yield true; // fingerprint errors are not persistence failures
|
||||
}
|
||||
case FingerprintSuccess fingerprintSuccess -> {
|
||||
case FingerprintSuccess fingerprintSuccess ->
|
||||
handleFingerprintSuccess(candidate, fingerprintSuccess, context, attemptStart);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -264,8 +282,9 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
* @param fingerprintSuccess the successful fingerprint result
|
||||
* @param context the batch run context
|
||||
* @param attemptStart the instant when processing started
|
||||
* @return true if processing and persistence succeeded, false if a persistence failure occurred
|
||||
*/
|
||||
private void handleFingerprintSuccess(
|
||||
private boolean handleFingerprintSuccess(
|
||||
SourceDocumentCandidate candidate,
|
||||
FingerprintSuccess fingerprintSuccess,
|
||||
BatchRunContext context,
|
||||
@@ -274,7 +293,7 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
logger.debug("Fingerprint computed for '{}': {}",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
|
||||
documentProcessingCoordinator.processDeferredOutcome(
|
||||
return documentProcessingCoordinator.processDeferredOutcome(
|
||||
candidate,
|
||||
fingerprint,
|
||||
context,
|
||||
|
||||
@@ -424,6 +424,98 @@ class BatchRunProcessingUseCaseTest {
|
||||
assertEquals(3, processor.processCallCount(), "processor should be called once per candidate");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Document-level persistence failure handling
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Regression test: when a document-level persistence failure occurs,
|
||||
* the batch outcome must be FAILURE, not SUCCESS.
|
||||
*/
|
||||
@Test
|
||||
void execute_documentPersistenceFailure_batchOutcomeIsFailure() throws Exception {
|
||||
MockRunLockPort lockPort = new MockRunLockPort();
|
||||
RuntimeConfiguration config = buildConfig(tempDir);
|
||||
|
||||
SourceDocumentCandidate candidate = makeCandidate("document.pdf");
|
||||
PdfExtractionSuccess success = new PdfExtractionSuccess("Invoice text", new PdfPageCount(1));
|
||||
FixedCandidatesPort candidatesPort = new FixedCandidatesPort(List.of(candidate));
|
||||
FixedExtractionPort extractionPort = new FixedExtractionPort(success);
|
||||
|
||||
// Use a coordinator that always fails persistence
|
||||
DocumentProcessingCoordinator failingProcessor = new DocumentProcessingCoordinator(
|
||||
new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository(),
|
||||
new NoOpUnitOfWorkPort(), new NoOpProcessingLogger()) {
|
||||
@Override
|
||||
public boolean processDeferredOutcome(
|
||||
de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate candidate,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint fingerprint,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext context,
|
||||
java.time.Instant attemptStart,
|
||||
java.util.function.Function<de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate, de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome> pipelineExecutor) {
|
||||
// Always report persistence failure
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
DefaultBatchRunProcessingUseCase useCase = buildUseCase(
|
||||
config, lockPort, candidatesPort, extractionPort,
|
||||
new AlwaysSuccessFingerprintPort(), failingProcessor);
|
||||
BatchRunContext context = new BatchRunContext(new RunId("persist-fail"), Instant.now());
|
||||
|
||||
BatchRunOutcome outcome = useCase.execute(context);
|
||||
|
||||
assertTrue(outcome.isFailure(), "Document persistence failure should yield FAILURE outcome");
|
||||
assertFalse(outcome.isSuccess(), "Batch must not succeed when document persistence failed");
|
||||
}
|
||||
|
||||
/**
|
||||
* Regression test: mixed batch where one document succeeds and one has persistence failure.
|
||||
* The batch outcome must be FAILURE due to the persistence failure.
|
||||
*/
|
||||
@Test
|
||||
void execute_mixedBatch_oneCandidateSuccess_oneDocumentPersistenceFails_batchIsFailure() throws Exception {
|
||||
MockRunLockPort lockPort = new MockRunLockPort();
|
||||
RuntimeConfiguration config = buildConfig(tempDir);
|
||||
|
||||
SourceDocumentCandidate goodCandidate = makeCandidate("good.pdf");
|
||||
SourceDocumentCandidate failCandidate = makeCandidate("fails.pdf");
|
||||
|
||||
PdfExtractionSuccess success = new PdfExtractionSuccess("Invoice text", new PdfPageCount(1));
|
||||
FixedCandidatesPort candidatesPort = new FixedCandidatesPort(List.of(goodCandidate, failCandidate));
|
||||
FixedExtractionPort extractionPort = new FixedExtractionPort(success);
|
||||
|
||||
// Coordinator that succeeds for first document, fails persistence for second
|
||||
DocumentProcessingCoordinator selectiveFailingProcessor = new DocumentProcessingCoordinator(
|
||||
new NoOpDocumentRecordRepository(), new NoOpProcessingAttemptRepository(),
|
||||
new NoOpUnitOfWorkPort(), new NoOpProcessingLogger()) {
|
||||
private int callCount = 0;
|
||||
|
||||
@Override
|
||||
public boolean processDeferredOutcome(
|
||||
de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate candidate,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint fingerprint,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext context,
|
||||
java.time.Instant attemptStart,
|
||||
java.util.function.Function<de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate, de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome> pipelineExecutor) {
|
||||
callCount++;
|
||||
// First document succeeds, second fails persistence
|
||||
return callCount == 1;
|
||||
}
|
||||
};
|
||||
|
||||
DefaultBatchRunProcessingUseCase useCase = buildUseCase(
|
||||
config, lockPort, candidatesPort, extractionPort,
|
||||
new AlwaysSuccessFingerprintPort(), selectiveFailingProcessor);
|
||||
BatchRunContext context = new BatchRunContext(new RunId("mixed-persist-fail"), Instant.now());
|
||||
|
||||
BatchRunOutcome outcome = useCase.execute(context);
|
||||
|
||||
assertTrue(outcome.isFailure(),
|
||||
"Batch must fail when any document has a persistence failure, even if others succeeded");
|
||||
assertFalse(outcome.isSuccess(), "Cannot be SUCCESS when persistence failed for any document");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -615,7 +707,7 @@ class BatchRunProcessingUseCaseTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(
|
||||
public boolean process(
|
||||
de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate candidate,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint fingerprint,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome outcome,
|
||||
@@ -623,11 +715,11 @@ class BatchRunProcessingUseCaseTest {
|
||||
java.time.Instant attemptStart) {
|
||||
processCallCount++;
|
||||
// Delegate to super so the real logic runs (with no-op repos)
|
||||
super.process(candidate, fingerprint, outcome, context, attemptStart);
|
||||
return super.process(candidate, fingerprint, outcome, context, attemptStart);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processDeferredOutcome(
|
||||
public boolean processDeferredOutcome(
|
||||
de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate candidate,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint fingerprint,
|
||||
de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext context,
|
||||
@@ -635,7 +727,7 @@ class BatchRunProcessingUseCaseTest {
|
||||
java.util.function.Function<de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate, de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome> pipelineExecutor) {
|
||||
processCallCount++;
|
||||
// Delegate to super so the real logic runs (with no-op repos)
|
||||
super.processDeferredOutcome(candidate, fingerprint, context, attemptStart, pipelineExecutor);
|
||||
return super.processDeferredOutcome(candidate, fingerprint, context, attemptStart, pipelineExecutor);
|
||||
}
|
||||
|
||||
int processCallCount() { return processCallCount; }
|
||||
|
||||
Reference in New Issue
Block a user