diff --git a/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java b/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java index 9eb859d..fcde2b1 100644 --- a/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java +++ b/pdf-umbenenner-application/src/main/java/de/gecheckt/pdf/umbenenner/application/service/DocumentProcessingCoordinator.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import java.time.Instant; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -282,23 +283,12 @@ public class DocumentProcessingCoordinator { false // not retryable ); + DocumentRecord skipRecord = buildSkipRecord(existingRecord, candidate, now); + // Write attempt and master record atomically unitOfWorkPort.executeInTransaction(txOps -> { txOps.saveProcessingAttempt(skipAttempt); - - // Update master record: only updatedAt changes; status and counters stay the same - DocumentRecord updatedRecord = new DocumentRecord( - existingRecord.fingerprint(), - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - existingRecord.overallStatus(), // terminal status unchanged - existingRecord.failureCounters(), // counters unchanged for skip - existingRecord.lastFailureInstant(), - existingRecord.lastSuccessInstant(), - existingRecord.createdAt(), - now // updatedAt = now - ); - txOps.updateDocumentRecord(updatedRecord); + txOps.updateDocumentRecord(skipRecord); }); LOG.debug("Skip attempt #{} persisted for '{}' with status {}.", @@ -314,16 +304,7 @@ public class DocumentProcessingCoordinator { // New document path // ------------------------------------------------------------------------- - /** - * Processes a newly discovered document (no existing master record) and persists - * both the attempt and the new master record. - * - * @param candidate the candidate being processed - * @param fingerprint the document fingerprint - * @param pipelineOutcome the pipeline result - * @param context the current batch run context - * @param attemptStart the start instant of this processing attempt - */ + /** Maps the pipeline outcome for a new document and persists attempt + new master record. */ private void processAndPersistNewDocument( SourceDocumentCandidate candidate, DocumentFingerprint fingerprint, @@ -332,63 +313,17 @@ public class DocumentProcessingCoordinator { Instant attemptStart) { Instant now = Instant.now(); - - // Map outcome to status/counters for a brand-new document ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome); - - try { - // Attempt number is always 1 for a new document - int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint); - - ProcessingAttempt attempt = buildAttempt( - fingerprint, context, attemptNumber, attemptStart, now, outcome); - - // Create the new master record - DocumentRecord newRecord = new DocumentRecord( - fingerprint, - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters(), - outcome.overallStatus() == ProcessingStatus.SUCCESS ? null : now, // lastFailureInstant - outcome.overallStatus() == ProcessingStatus.SUCCESS ? now : null, // lastSuccessInstant - now, // createdAt - now // updatedAt - ); - - // Persist attempt and master record atomically - unitOfWorkPort.executeInTransaction(txOps -> { - txOps.saveProcessingAttempt(attempt); - txOps.createDocumentRecord(newRecord); - }); - - LOG.info("New document '{}' processed: status={}, contentErrors={}, transientErrors={}.", - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters().contentErrorCount(), - outcome.counters().transientErrorCount()); - - } catch (DocumentPersistenceException e) { - LOG.error("Failed to persist processing result for new document '{}': {}", - candidate.uniqueIdentifier(), e.getMessage(), e); - } + DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now); + persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome, + txOps -> txOps.createDocumentRecord(newRecord)); } // ------------------------------------------------------------------------- // Known processable document path // ------------------------------------------------------------------------- - /** - * Processes a known but non-terminal document and updates both the attempt history - * and the master record. - * - * @param candidate the candidate being processed - * @param fingerprint the document fingerprint - * @param pipelineOutcome the pipeline result - * @param existingRecord the current master record (not terminal) - * @param context the current batch run context - * @param attemptStart the start instant of this processing attempt - */ + /** Maps the pipeline outcome for a known document and persists attempt + updated master record. */ private void processAndPersistKnownDocument( SourceDocumentCandidate candidate, DocumentFingerprint fingerprint, @@ -398,47 +333,10 @@ public class DocumentProcessingCoordinator { Instant attemptStart) { Instant now = Instant.now(); - - // Map outcome to status/counters, taking existing counters into account ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters()); - - try { - int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint); - - ProcessingAttempt attempt = buildAttempt( - fingerprint, context, attemptNumber, attemptStart, now, outcome); - - // Update the master record with new status, counters and timestamps - DocumentRecord updatedRecord = new DocumentRecord( - existingRecord.fingerprint(), - new SourceDocumentLocator(candidate.locator().value()), - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters(), - outcome.overallStatus() == ProcessingStatus.SUCCESS - ? existingRecord.lastFailureInstant() : now, - outcome.overallStatus() == ProcessingStatus.SUCCESS - ? now : existingRecord.lastSuccessInstant(), - existingRecord.createdAt(), - now // updatedAt - ); - - // Persist attempt and master record atomically - unitOfWorkPort.executeInTransaction(txOps -> { - txOps.saveProcessingAttempt(attempt); - txOps.updateDocumentRecord(updatedRecord); - }); - - LOG.info("Known document '{}' processed: status={}, contentErrors={}, transientErrors={}.", - candidate.uniqueIdentifier(), - outcome.overallStatus(), - outcome.counters().contentErrorCount(), - outcome.counters().transientErrorCount()); - - } catch (DocumentPersistenceException e) { - LOG.error("Failed to persist processing result for known document '{}': {}", - candidate.uniqueIdentifier(), e.getMessage(), e); - } + DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now); + persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome, + txOps -> txOps.updateDocumentRecord(updatedRecord)); } // ------------------------------------------------------------------------- @@ -485,7 +383,6 @@ public class DocumentProcessingCoordinator { return switch (pipelineOutcome) { case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> { // success: document passed all pre-checks - // In scope (no KI, no target copy), PreCheckPassed is the terminal success yield new ProcessingOutcome( ProcessingStatus.SUCCESS, existingCounters, // counters unchanged on success @@ -526,6 +423,109 @@ public class DocumentProcessingCoordinator { }; } + // ------------------------------------------------------------------------- + // Record assembly helpers + // ------------------------------------------------------------------------- + + private DocumentRecord buildNewDocumentRecord( + DocumentFingerprint fingerprint, + SourceDocumentCandidate candidate, + ProcessingOutcome outcome, + Instant now) { + boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS; + return new DocumentRecord( + fingerprint, + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + outcome.overallStatus(), + outcome.counters(), + success ? null : now, // lastFailureInstant + success ? now : null, // lastSuccessInstant + now, // createdAt + now // updatedAt + ); + } + + private DocumentRecord buildUpdatedDocumentRecord( + DocumentRecord existingRecord, + SourceDocumentCandidate candidate, + ProcessingOutcome outcome, + Instant now) { + boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS; + return new DocumentRecord( + existingRecord.fingerprint(), + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + outcome.overallStatus(), + outcome.counters(), + success ? existingRecord.lastFailureInstant() : now, + success ? now : existingRecord.lastSuccessInstant(), + existingRecord.createdAt(), + now // updatedAt + ); + } + + /** Builds a skip record: only {@code updatedAt} advances; status and counters are unchanged. */ + private DocumentRecord buildSkipRecord( + DocumentRecord existingRecord, + SourceDocumentCandidate candidate, + Instant now) { + return new DocumentRecord( + existingRecord.fingerprint(), + new SourceDocumentLocator(candidate.locator().value()), + candidate.uniqueIdentifier(), + existingRecord.overallStatus(), + existingRecord.failureCounters(), + existingRecord.lastFailureInstant(), + existingRecord.lastSuccessInstant(), + existingRecord.createdAt(), + now // updatedAt + ); + } + + // ------------------------------------------------------------------------- + // Common persistence flow (non-skip paths) + // ------------------------------------------------------------------------- + + /** + * Loads the next attempt number, builds and persists the attempt together with the + * document record atomically, then logs the result. + *
+ * {@code recordWriter} performs either {@code createDocumentRecord} or
+ * {@code updateDocumentRecord} depending on whether the document is new or known.
+ * All persistence failures are caught and logged; the batch run continues.
+ */
+ private void persistAttemptAndRecord(
+ SourceDocumentCandidate candidate,
+ DocumentFingerprint fingerprint,
+ BatchRunContext context,
+ Instant attemptStart,
+ Instant now,
+ ProcessingOutcome outcome,
+ Consumer