From 3ab10a89f0b28aaafe575edc2631994a64ca1f29 Mon Sep 17 00:00:00 2001 From: Marcus van Elst Date: Sat, 4 Apr 2026 12:40:00 +0200 Subject: [PATCH] Nachbearbeitung: DocumentProcessingCoordinator weiter strukturell vereinfacht --- .../DocumentProcessingCoordinator.java | 230 +++++++++--------- 1 file changed, 115 insertions(+), 115 deletions(-) diff --git a/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java b/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java index 9eb859d..fcde2b1 100644 --- a/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java +++ b/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import java.time.Instant; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -282,23 +283,12 @@ public class DocumentProcessingCoordinator { false // not retryable ); + DocumentRecord skipRecord = buildSkipRecord(existingRecord, candidate, now); + // Write attempt and master record atomically unitOfWorkPort.executeInTransaction(txOps -> { txOps.saveProcessingAttempt(skipAttempt); - - // Update master record: only updatedAt changes; status and counters stay the same - DocumentRecord updatedRecord = new DocumentRecord( - existingRecord.fingerprint(), - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - existingRecord.overallStatus(), // terminal status unchanged - existingRecord.failureCounters(), // counters unchanged for skip - existingRecord.lastFailureInstant(), - existingRecord.lastSuccessInstant(), - existingRecord.createdAt(), - now // updatedAt = now - ); - txOps.updateDocumentRecord(updatedRecord); + txOps.updateDocumentRecord(skipRecord); }); LOG.debug("Skip attempt #{} persisted for '{}' with status {}.", @@ -314,16 +304,7 @@ public class DocumentProcessingCoordinator { // New document path // ------------------------------------------------------------------------- - /** - * Processes a newly discovered document (no existing master record) and persists - * both the attempt and the new master record. - * - * @param candidate the candidate being processed - * @param fingerprint the document fingerprint - * @param pipelineOutcome the pipeline result - * @param context the current batch run context - * @param attemptStart the start instant of this processing attempt - */ + /** Maps the pipeline outcome for a new document and persists attempt + new master record. */ private void processAndPersistNewDocument( SourceDocumentCandidate candidate, DocumentFingerprint fingerprint, @@ -332,63 +313,17 @@ public class DocumentProcessingCoordinator { Instant attemptStart) { Instant now = Instant.now(); - - // Map outcome to status/counters for a brand-new document ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome); - - try { - // Attempt number is always 1 for a new document - int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint); - - ProcessingAttempt attempt = buildAttempt( - fingerprint, context, attemptNumber, attemptStart, now, outcome); - - // Create the new master record - DocumentRecord newRecord = new DocumentRecord( - fingerprint, - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters(), - outcome.overallStatus() == ProcessingStatus.SUCCESS ? null : now, // lastFailureInstant - outcome.overallStatus() == ProcessingStatus.SUCCESS ? now : null, // lastSuccessInstant - now, // createdAt - now // updatedAt - ); - - // Persist attempt and master record atomically - unitOfWorkPort.executeInTransaction(txOps -> { - txOps.saveProcessingAttempt(attempt); - txOps.createDocumentRecord(newRecord); - }); - - LOG.info("New document '{}' processed: status={}, contentErrors={}, transientErrors={}.", - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters().contentErrorCount(), - outcome.counters().transientErrorCount()); - - } catch (DocumentPersistenceException e) { - LOG.error("Failed to persist processing result for new document '{}': {}", - candidate.uniqueIdentifier(), e.getMessage(), e); - } + DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now); + persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome, + txOps -> txOps.createDocumentRecord(newRecord)); } // ------------------------------------------------------------------------- // Known processable document path // ------------------------------------------------------------------------- - /** - * Processes a known but non-terminal document and updates both the attempt history - * and the master record. - * - * @param candidate the candidate being processed - * @param fingerprint the document fingerprint - * @param pipelineOutcome the pipeline result - * @param existingRecord the current master record (not terminal) - * @param context the current batch run context - * @param attemptStart the start instant of this processing attempt - */ + /** Maps the pipeline outcome for a known document and persists attempt + updated master record. */ private void processAndPersistKnownDocument( SourceDocumentCandidate candidate, DocumentFingerprint fingerprint, @@ -398,47 +333,10 @@ public class DocumentProcessingCoordinator { Instant attemptStart) { Instant now = Instant.now(); - - // Map outcome to status/counters, taking existing counters into account ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters()); - - try { - int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint); - - ProcessingAttempt attempt = buildAttempt( - fingerprint, context, attemptNumber, attemptStart, now, outcome); - - // Update the master record with new status, counters and timestamps - DocumentRecord updatedRecord = new DocumentRecord( - existingRecord.fingerprint(), - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters(), - outcome.overallStatus() == ProcessingStatus.SUCCESS - ? existingRecord.lastFailureInstant() : now, - outcome.overallStatus() == ProcessingStatus.SUCCESS - ? now : existingRecord.lastSuccessInstant(), - existingRecord.createdAt(), - now // updatedAt - ); - - // Persist attempt and master record atomically - unitOfWorkPort.executeInTransaction(txOps -> { - txOps.saveProcessingAttempt(attempt); - txOps.updateDocumentRecord(updatedRecord); - }); - - LOG.info("Known document '{}' processed: status={}, contentErrors={}, transientErrors={}.", - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters().contentErrorCount(), - outcome.counters().transientErrorCount()); - - } catch (DocumentPersistenceException e) { - LOG.error("Failed to persist processing result for known document '{}': {}", - candidate.uniqueIdentifier(), e.getMessage(), e); - } + DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now); + persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome, + txOps -> txOps.updateDocumentRecord(updatedRecord)); } // ------------------------------------------------------------------------- @@ -485,7 +383,6 @@ public class DocumentProcessingCoordinator { return switch (pipelineOutcome) { case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> { // success: document passed all pre-checks - // In scope (no KI, no target copy), PreCheckPassed is the terminal success yield new ProcessingOutcome( ProcessingStatus.SUCCESS, existingCounters, // counters unchanged on success @@ -526,6 +423,109 @@ public class DocumentProcessingCoordinator { }; } + // ------------------------------------------------------------------------- + // Record assembly helpers + // ------------------------------------------------------------------------- + + private DocumentRecord buildNewDocumentRecord( + DocumentFingerprint fingerprint, + SourceDocumentCandidate candidate, + ProcessingOutcome outcome, + Instant now) { + boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS; + return new DocumentRecord( + fingerprint, + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + outcome.overallStatus(), + outcome.counters(), + success ? null : now, // lastFailureInstant + success ? now : null, // lastSuccessInstant + now, // createdAt + now // updatedAt + ); + } + + private DocumentRecord buildUpdatedDocumentRecord( + DocumentRecord existingRecord, + SourceDocumentCandidate candidate, + ProcessingOutcome outcome, + Instant now) { + boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS; + return new DocumentRecord( + existingRecord.fingerprint(), + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + outcome.overallStatus(), + outcome.counters(), + success ? existingRecord.lastFailureInstant() : now, + success ? now : existingRecord.lastSuccessInstant(), + existingRecord.createdAt(), + now // updatedAt + ); + } + + /** Builds a skip record: only {@code updatedAt} advances; status and counters are unchanged. */ + private DocumentRecord buildSkipRecord( + DocumentRecord existingRecord, + SourceDocumentCandidate candidate, + Instant now) { + return new DocumentRecord( + existingRecord.fingerprint(), + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + existingRecord.overallStatus(), + existingRecord.failureCounters(), + existingRecord.lastFailureInstant(), + existingRecord.lastSuccessInstant(), + existingRecord.createdAt(), + now // updatedAt + ); + } + + // ------------------------------------------------------------------------- + // Common persistence flow (non-skip paths) + // ------------------------------------------------------------------------- + + /** + * Loads the next attempt number, builds and persists the attempt together with the + * document record atomically, then logs the result. + *

+ * {@code recordWriter} performs either {@code createDocumentRecord} or + * {@code updateDocumentRecord} depending on whether the document is new or known. + * All persistence failures are caught and logged; the batch run continues. + */ + private void persistAttemptAndRecord( + SourceDocumentCandidate candidate, + DocumentFingerprint fingerprint, + BatchRunContext context, + Instant attemptStart, + Instant now, + ProcessingOutcome outcome, + Consumer recordWriter) { + + try { + int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint); + ProcessingAttempt attempt = + buildAttempt(fingerprint, context, attemptNumber, attemptStart, now, outcome); + + unitOfWorkPort.executeInTransaction(txOps -> { + txOps.saveProcessingAttempt(attempt); + recordWriter.accept(txOps); + }); + + LOG.info("Document '{}' processed: status={}, contentErrors={}, transientErrors={}.", + candidate.uniqueIdentifier(), + outcome.overallStatus(), + outcome.counters().contentErrorCount(), + outcome.counters().transientErrorCount()); + + } catch (DocumentPersistenceException e) { + LOG.error("Failed to persist processing result for '{}': {}", + candidate.uniqueIdentifier(), e.getMessage(), e); + } + } + // ------------------------------------------------------------------------- // Helper: build ProcessingAttempt // -------------------------------------------------------------------------