Nachbearbeitung: DocumentProcessingCoordinator weiter strukturell
vereinfacht
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
|
|||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -282,23 +283,12 @@ public class DocumentProcessingCoordinator {
|
|||||||
false // not retryable
|
false // not retryable
|
||||||
);
|
);
|
||||||
|
|
||||||
|
DocumentRecord skipRecord = buildSkipRecord(existingRecord, candidate, now);
|
||||||
|
|
||||||
// Write attempt and master record atomically
|
// Write attempt and master record atomically
|
||||||
unitOfWorkPort.executeInTransaction(txOps -> {
|
unitOfWorkPort.executeInTransaction(txOps -> {
|
||||||
txOps.saveProcessingAttempt(skipAttempt);
|
txOps.saveProcessingAttempt(skipAttempt);
|
||||||
|
txOps.updateDocumentRecord(skipRecord);
|
||||||
// Update master record: only updatedAt changes; status and counters stay the same
|
|
||||||
DocumentRecord updatedRecord = new DocumentRecord(
|
|
||||||
existingRecord.fingerprint(),
|
|
||||||
new SourceDocumentLocator(candidate.locator().value()),
|
|
||||||
candidate.uniqueIdentifier(),
|
|
||||||
existingRecord.overallStatus(), // terminal status unchanged
|
|
||||||
existingRecord.failureCounters(), // counters unchanged for skip
|
|
||||||
existingRecord.lastFailureInstant(),
|
|
||||||
existingRecord.lastSuccessInstant(),
|
|
||||||
existingRecord.createdAt(),
|
|
||||||
now // updatedAt = now
|
|
||||||
);
|
|
||||||
txOps.updateDocumentRecord(updatedRecord);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
LOG.debug("Skip attempt #{} persisted for '{}' with status {}.",
|
LOG.debug("Skip attempt #{} persisted for '{}' with status {}.",
|
||||||
@@ -314,16 +304,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
// New document path
|
// New document path
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/** Maps the pipeline outcome for a new document and persists attempt + new master record. */
|
||||||
* Processes a newly discovered document (no existing master record) and persists
|
|
||||||
* both the attempt and the new master record.
|
|
||||||
*
|
|
||||||
* @param candidate the candidate being processed
|
|
||||||
* @param fingerprint the document fingerprint
|
|
||||||
* @param pipelineOutcome the pipeline result
|
|
||||||
* @param context the current batch run context
|
|
||||||
* @param attemptStart the start instant of this processing attempt
|
|
||||||
*/
|
|
||||||
private void processAndPersistNewDocument(
|
private void processAndPersistNewDocument(
|
||||||
SourceDocumentCandidate candidate,
|
SourceDocumentCandidate candidate,
|
||||||
DocumentFingerprint fingerprint,
|
DocumentFingerprint fingerprint,
|
||||||
@@ -332,63 +313,17 @@ public class DocumentProcessingCoordinator {
|
|||||||
Instant attemptStart) {
|
Instant attemptStart) {
|
||||||
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
|
|
||||||
// Map outcome to status/counters for a brand-new document
|
|
||||||
ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome);
|
ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome);
|
||||||
|
DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now);
|
||||||
try {
|
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||||
// Attempt number is always 1 for a new document
|
txOps -> txOps.createDocumentRecord(newRecord));
|
||||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
|
||||||
|
|
||||||
ProcessingAttempt attempt = buildAttempt(
|
|
||||||
fingerprint, context, attemptNumber, attemptStart, now, outcome);
|
|
||||||
|
|
||||||
// Create the new master record
|
|
||||||
DocumentRecord newRecord = new DocumentRecord(
|
|
||||||
fingerprint,
|
|
||||||
new SourceDocumentLocator(candidate.locator().value()),
|
|
||||||
candidate.uniqueIdentifier(),
|
|
||||||
outcome.overallStatus(),
|
|
||||||
outcome.counters(),
|
|
||||||
outcome.overallStatus() == ProcessingStatus.SUCCESS ? null : now, // lastFailureInstant
|
|
||||||
outcome.overallStatus() == ProcessingStatus.SUCCESS ? now : null, // lastSuccessInstant
|
|
||||||
now, // createdAt
|
|
||||||
now // updatedAt
|
|
||||||
);
|
|
||||||
|
|
||||||
// Persist attempt and master record atomically
|
|
||||||
unitOfWorkPort.executeInTransaction(txOps -> {
|
|
||||||
txOps.saveProcessingAttempt(attempt);
|
|
||||||
txOps.createDocumentRecord(newRecord);
|
|
||||||
});
|
|
||||||
|
|
||||||
LOG.info("New document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
|
|
||||||
candidate.uniqueIdentifier(),
|
|
||||||
outcome.overallStatus(),
|
|
||||||
outcome.counters().contentErrorCount(),
|
|
||||||
outcome.counters().transientErrorCount());
|
|
||||||
|
|
||||||
} catch (DocumentPersistenceException e) {
|
|
||||||
LOG.error("Failed to persist processing result for new document '{}': {}",
|
|
||||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Known processable document path
|
// Known processable document path
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/** Maps the pipeline outcome for a known document and persists attempt + updated master record. */
|
||||||
* Processes a known but non-terminal document and updates both the attempt history
|
|
||||||
* and the master record.
|
|
||||||
*
|
|
||||||
* @param candidate the candidate being processed
|
|
||||||
* @param fingerprint the document fingerprint
|
|
||||||
* @param pipelineOutcome the pipeline result
|
|
||||||
* @param existingRecord the current master record (not terminal)
|
|
||||||
* @param context the current batch run context
|
|
||||||
* @param attemptStart the start instant of this processing attempt
|
|
||||||
*/
|
|
||||||
private void processAndPersistKnownDocument(
|
private void processAndPersistKnownDocument(
|
||||||
SourceDocumentCandidate candidate,
|
SourceDocumentCandidate candidate,
|
||||||
DocumentFingerprint fingerprint,
|
DocumentFingerprint fingerprint,
|
||||||
@@ -398,47 +333,10 @@ public class DocumentProcessingCoordinator {
|
|||||||
Instant attemptStart) {
|
Instant attemptStart) {
|
||||||
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
|
|
||||||
// Map outcome to status/counters, taking existing counters into account
|
|
||||||
ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters());
|
ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters());
|
||||||
|
DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now);
|
||||||
try {
|
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
txOps -> txOps.updateDocumentRecord(updatedRecord));
|
||||||
|
|
||||||
ProcessingAttempt attempt = buildAttempt(
|
|
||||||
fingerprint, context, attemptNumber, attemptStart, now, outcome);
|
|
||||||
|
|
||||||
// Update the master record with new status, counters and timestamps
|
|
||||||
DocumentRecord updatedRecord = new DocumentRecord(
|
|
||||||
existingRecord.fingerprint(),
|
|
||||||
new SourceDocumentLocator(candidate.locator().value()),
|
|
||||||
candidate.uniqueIdentifier(),
|
|
||||||
outcome.overallStatus(),
|
|
||||||
outcome.counters(),
|
|
||||||
outcome.overallStatus() == ProcessingStatus.SUCCESS
|
|
||||||
? existingRecord.lastFailureInstant() : now,
|
|
||||||
outcome.overallStatus() == ProcessingStatus.SUCCESS
|
|
||||||
? now : existingRecord.lastSuccessInstant(),
|
|
||||||
existingRecord.createdAt(),
|
|
||||||
now // updatedAt
|
|
||||||
);
|
|
||||||
|
|
||||||
// Persist attempt and master record atomically
|
|
||||||
unitOfWorkPort.executeInTransaction(txOps -> {
|
|
||||||
txOps.saveProcessingAttempt(attempt);
|
|
||||||
txOps.updateDocumentRecord(updatedRecord);
|
|
||||||
});
|
|
||||||
|
|
||||||
LOG.info("Known document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
|
|
||||||
candidate.uniqueIdentifier(),
|
|
||||||
outcome.overallStatus(),
|
|
||||||
outcome.counters().contentErrorCount(),
|
|
||||||
outcome.counters().transientErrorCount());
|
|
||||||
|
|
||||||
} catch (DocumentPersistenceException e) {
|
|
||||||
LOG.error("Failed to persist processing result for known document '{}': {}",
|
|
||||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
@@ -485,7 +383,6 @@ public class DocumentProcessingCoordinator {
|
|||||||
return switch (pipelineOutcome) {
|
return switch (pipelineOutcome) {
|
||||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> {
|
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> {
|
||||||
// success: document passed all pre-checks
|
// success: document passed all pre-checks
|
||||||
// In scope (no KI, no target copy), PreCheckPassed is the terminal success
|
|
||||||
yield new ProcessingOutcome(
|
yield new ProcessingOutcome(
|
||||||
ProcessingStatus.SUCCESS,
|
ProcessingStatus.SUCCESS,
|
||||||
existingCounters, // counters unchanged on success
|
existingCounters, // counters unchanged on success
|
||||||
@@ -526,6 +423,109 @@ public class DocumentProcessingCoordinator {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Record assembly helpers
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private DocumentRecord buildNewDocumentRecord(
|
||||||
|
DocumentFingerprint fingerprint,
|
||||||
|
SourceDocumentCandidate candidate,
|
||||||
|
ProcessingOutcome outcome,
|
||||||
|
Instant now) {
|
||||||
|
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
||||||
|
return new DocumentRecord(
|
||||||
|
fingerprint,
|
||||||
|
new SourceDocumentLocator(candidate.locator().value()),
|
||||||
|
candidate.uniqueIdentifier(),
|
||||||
|
outcome.overallStatus(),
|
||||||
|
outcome.counters(),
|
||||||
|
success ? null : now, // lastFailureInstant
|
||||||
|
success ? now : null, // lastSuccessInstant
|
||||||
|
now, // createdAt
|
||||||
|
now // updatedAt
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DocumentRecord buildUpdatedDocumentRecord(
|
||||||
|
DocumentRecord existingRecord,
|
||||||
|
SourceDocumentCandidate candidate,
|
||||||
|
ProcessingOutcome outcome,
|
||||||
|
Instant now) {
|
||||||
|
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
||||||
|
return new DocumentRecord(
|
||||||
|
existingRecord.fingerprint(),
|
||||||
|
new SourceDocumentLocator(candidate.locator().value()),
|
||||||
|
candidate.uniqueIdentifier(),
|
||||||
|
outcome.overallStatus(),
|
||||||
|
outcome.counters(),
|
||||||
|
success ? existingRecord.lastFailureInstant() : now,
|
||||||
|
success ? now : existingRecord.lastSuccessInstant(),
|
||||||
|
existingRecord.createdAt(),
|
||||||
|
now // updatedAt
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Builds a skip record: only {@code updatedAt} advances; status and counters are unchanged. */
|
||||||
|
private DocumentRecord buildSkipRecord(
|
||||||
|
DocumentRecord existingRecord,
|
||||||
|
SourceDocumentCandidate candidate,
|
||||||
|
Instant now) {
|
||||||
|
return new DocumentRecord(
|
||||||
|
existingRecord.fingerprint(),
|
||||||
|
new SourceDocumentLocator(candidate.locator().value()),
|
||||||
|
candidate.uniqueIdentifier(),
|
||||||
|
existingRecord.overallStatus(),
|
||||||
|
existingRecord.failureCounters(),
|
||||||
|
existingRecord.lastFailureInstant(),
|
||||||
|
existingRecord.lastSuccessInstant(),
|
||||||
|
existingRecord.createdAt(),
|
||||||
|
now // updatedAt
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Common persistence flow (non-skip paths)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Loads the next attempt number, builds and persists the attempt together with the
|
||||||
|
* document record atomically, then logs the result.
|
||||||
|
* <p>
|
||||||
|
* {@code recordWriter} performs either {@code createDocumentRecord} or
|
||||||
|
* {@code updateDocumentRecord} depending on whether the document is new or known.
|
||||||
|
* All persistence failures are caught and logged; the batch run continues.
|
||||||
|
*/
|
||||||
|
private void persistAttemptAndRecord(
|
||||||
|
SourceDocumentCandidate candidate,
|
||||||
|
DocumentFingerprint fingerprint,
|
||||||
|
BatchRunContext context,
|
||||||
|
Instant attemptStart,
|
||||||
|
Instant now,
|
||||||
|
ProcessingOutcome outcome,
|
||||||
|
Consumer<UnitOfWorkPort.TransactionOperations> recordWriter) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||||
|
ProcessingAttempt attempt =
|
||||||
|
buildAttempt(fingerprint, context, attemptNumber, attemptStart, now, outcome);
|
||||||
|
|
||||||
|
unitOfWorkPort.executeInTransaction(txOps -> {
|
||||||
|
txOps.saveProcessingAttempt(attempt);
|
||||||
|
recordWriter.accept(txOps);
|
||||||
|
});
|
||||||
|
|
||||||
|
LOG.info("Document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
|
||||||
|
candidate.uniqueIdentifier(),
|
||||||
|
outcome.overallStatus(),
|
||||||
|
outcome.counters().contentErrorCount(),
|
||||||
|
outcome.counters().transientErrorCount());
|
||||||
|
|
||||||
|
} catch (DocumentPersistenceException e) {
|
||||||
|
LOG.error("Failed to persist processing result for '{}': {}",
|
||||||
|
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Helper: build ProcessingAttempt
|
// Helper: build ProcessingAttempt
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user