Optimierung: Zustandsübergangslogik aus DocumentProcessingCoordinator
herausgelöst
This commit is contained in:
@@ -313,7 +313,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
Instant attemptStart) {
|
Instant attemptStart) {
|
||||||
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome);
|
ProcessingOutcomeTransition.ProcessingOutcome outcome = mapOutcomeForNewDocument(pipelineOutcome);
|
||||||
DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now);
|
DocumentRecord newRecord = buildNewDocumentRecord(fingerprint, candidate, outcome, now);
|
||||||
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||||
txOps -> txOps.createDocumentRecord(newRecord));
|
txOps -> txOps.createDocumentRecord(newRecord));
|
||||||
@@ -333,7 +333,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
Instant attemptStart) {
|
Instant attemptStart) {
|
||||||
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters());
|
ProcessingOutcomeTransition.ProcessingOutcome outcome = mapOutcomeForKnownDocument(pipelineOutcome, existingRecord.failureCounters());
|
||||||
DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now);
|
DocumentRecord updatedRecord = buildUpdatedDocumentRecord(existingRecord, candidate, outcome, now);
|
||||||
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
persistAttemptAndRecord(candidate, fingerprint, context, attemptStart, now, outcome,
|
||||||
txOps -> txOps.updateDocumentRecord(updatedRecord));
|
txOps -> txOps.updateDocumentRecord(updatedRecord));
|
||||||
@@ -350,77 +350,23 @@ public class DocumentProcessingCoordinator {
|
|||||||
* @param pipelineOutcome the pipeline result
|
* @param pipelineOutcome the pipeline result
|
||||||
* @return the outcome with status, counters and retryable flag
|
* @return the outcome with status, counters and retryable flag
|
||||||
*/
|
*/
|
||||||
private ProcessingOutcome mapOutcomeForNewDocument(DocumentProcessingOutcome pipelineOutcome) {
|
private ProcessingOutcomeTransition.ProcessingOutcome mapOutcomeForNewDocument(
|
||||||
return mapOutcomeForKnownDocument(pipelineOutcome, FailureCounters.zero());
|
DocumentProcessingOutcome pipelineOutcome) {
|
||||||
|
return ProcessingOutcomeTransition.forNewDocument(pipelineOutcome);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maps an outcome to status, counters, and retryable flag, taking the
|
* Maps an outcome to status, counters, and retryable flag, taking the
|
||||||
* existing failure counters into account.
|
* existing failure counters into account.
|
||||||
* <p>
|
|
||||||
* <strong>Minimal rules applied here:</strong>
|
|
||||||
* <ul>
|
|
||||||
* <li>success → {@link ProcessingStatus#SUCCESS}, counters unchanged,
|
|
||||||
* {@code retryable=false}.</li>
|
|
||||||
* <li>deterministic content error (first occurrence, contentErrorCount == 0) →
|
|
||||||
* {@link ProcessingStatus#FAILED_RETRYABLE}, contentErrorCount +1,
|
|
||||||
* {@code retryable=true}.</li>
|
|
||||||
* <li>deterministic content error (second occurrence, contentErrorCount >= 1) →
|
|
||||||
* {@link ProcessingStatus#FAILED_FINAL}, contentErrorCount +1,
|
|
||||||
* {@code retryable=false}.</li>
|
|
||||||
* <li>technical error → {@link ProcessingStatus#FAILED_RETRYABLE},
|
|
||||||
* transientErrorCount +1, {@code retryable=true}.</li>
|
|
||||||
* </ul>
|
|
||||||
*
|
*
|
||||||
* @param pipelineOutcome the pipeline result
|
* @param pipelineOutcome the pipeline result
|
||||||
* @param existingCounters the current failure counters from the master record
|
* @param existingCounters the current failure counters from the master record
|
||||||
* @return the outcome with updated status, counters and retryable flag
|
* @return the outcome with updated status, counters and retryable flag
|
||||||
*/
|
*/
|
||||||
private ProcessingOutcome mapOutcomeForKnownDocument(
|
private ProcessingOutcomeTransition.ProcessingOutcome mapOutcomeForKnownDocument(
|
||||||
DocumentProcessingOutcome pipelineOutcome,
|
DocumentProcessingOutcome pipelineOutcome,
|
||||||
FailureCounters existingCounters) {
|
FailureCounters existingCounters) {
|
||||||
|
return ProcessingOutcomeTransition.forKnownDocument(pipelineOutcome, existingCounters);
|
||||||
return switch (pipelineOutcome) {
|
|
||||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> {
|
|
||||||
// success: document passed all pre-checks
|
|
||||||
yield new ProcessingOutcome(
|
|
||||||
ProcessingStatus.SUCCESS,
|
|
||||||
existingCounters, // counters unchanged on success
|
|
||||||
false // not retryable
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
case PreCheckFailed contentError -> {
|
|
||||||
// Deterministic content error: apply the 1-retry rule
|
|
||||||
FailureCounters updatedCounters = existingCounters.withIncrementedContentErrorCount();
|
|
||||||
boolean isFirstOccurrence = existingCounters.contentErrorCount() == 0;
|
|
||||||
|
|
||||||
if (isFirstOccurrence) {
|
|
||||||
// First content error → FAILED_RETRYABLE
|
|
||||||
yield new ProcessingOutcome(
|
|
||||||
ProcessingStatus.FAILED_RETRYABLE,
|
|
||||||
updatedCounters,
|
|
||||||
true
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// Second (or later) content error → FAILED_FINAL
|
|
||||||
yield new ProcessingOutcome(
|
|
||||||
ProcessingStatus.FAILED_FINAL,
|
|
||||||
updatedCounters,
|
|
||||||
false
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case TechnicalDocumentError technicalError -> {
|
|
||||||
// Technical error after fingerprinting: always FAILED_RETRYABLE, increment transient counter
|
|
||||||
yield new ProcessingOutcome(
|
|
||||||
ProcessingStatus.FAILED_RETRYABLE,
|
|
||||||
existingCounters.withIncrementedTransientErrorCount(),
|
|
||||||
true
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
@@ -430,7 +376,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
private DocumentRecord buildNewDocumentRecord(
|
private DocumentRecord buildNewDocumentRecord(
|
||||||
DocumentFingerprint fingerprint,
|
DocumentFingerprint fingerprint,
|
||||||
SourceDocumentCandidate candidate,
|
SourceDocumentCandidate candidate,
|
||||||
ProcessingOutcome outcome,
|
ProcessingOutcomeTransition.ProcessingOutcome outcome,
|
||||||
Instant now) {
|
Instant now) {
|
||||||
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
||||||
return new DocumentRecord(
|
return new DocumentRecord(
|
||||||
@@ -449,7 +395,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
private DocumentRecord buildUpdatedDocumentRecord(
|
private DocumentRecord buildUpdatedDocumentRecord(
|
||||||
DocumentRecord existingRecord,
|
DocumentRecord existingRecord,
|
||||||
SourceDocumentCandidate candidate,
|
SourceDocumentCandidate candidate,
|
||||||
ProcessingOutcome outcome,
|
ProcessingOutcomeTransition.ProcessingOutcome outcome,
|
||||||
Instant now) {
|
Instant now) {
|
||||||
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
boolean success = outcome.overallStatus() == ProcessingStatus.SUCCESS;
|
||||||
return new DocumentRecord(
|
return new DocumentRecord(
|
||||||
@@ -501,7 +447,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
BatchRunContext context,
|
BatchRunContext context,
|
||||||
Instant attemptStart,
|
Instant attemptStart,
|
||||||
Instant now,
|
Instant now,
|
||||||
ProcessingOutcome outcome,
|
ProcessingOutcomeTransition.ProcessingOutcome outcome,
|
||||||
Consumer<UnitOfWorkPort.TransactionOperations> recordWriter) {
|
Consumer<UnitOfWorkPort.TransactionOperations> recordWriter) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -547,7 +493,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
int attemptNumber,
|
int attemptNumber,
|
||||||
Instant startedAt,
|
Instant startedAt,
|
||||||
Instant endedAt,
|
Instant endedAt,
|
||||||
ProcessingOutcome outcome) {
|
ProcessingOutcomeTransition.ProcessingOutcome outcome) {
|
||||||
|
|
||||||
String failureClass = null;
|
String failureClass = null;
|
||||||
String failureMessage = null;
|
String failureMessage = null;
|
||||||
@@ -577,7 +523,7 @@ public class DocumentProcessingCoordinator {
|
|||||||
* @param outcome the outcome
|
* @param outcome the outcome
|
||||||
* @return a non-null failure message string
|
* @return a non-null failure message string
|
||||||
*/
|
*/
|
||||||
private String buildFailureMessage(ProcessingOutcome outcome) {
|
private String buildFailureMessage(ProcessingOutcomeTransition.ProcessingOutcome outcome) {
|
||||||
return switch (outcome.overallStatus()) {
|
return switch (outcome.overallStatus()) {
|
||||||
case FAILED_RETRYABLE -> "Processing failed (retryable). "
|
case FAILED_RETRYABLE -> "Processing failed (retryable). "
|
||||||
+ "ContentErrors=" + outcome.counters().contentErrorCount()
|
+ "ContentErrors=" + outcome.counters().contentErrorCount()
|
||||||
@@ -589,23 +535,4 @@ public class DocumentProcessingCoordinator {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
// Internal value type: outcome
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal value type carrying the status, updated counters, and retryable flag
|
|
||||||
* after mapping from an outcome.
|
|
||||||
* <p>
|
|
||||||
* Tightly scoped to {@link DocumentProcessingCoordinator}; not exposed outside this class.
|
|
||||||
*
|
|
||||||
* @param overallStatus the overall status to persist
|
|
||||||
* @param counters the updated failure counters to persist
|
|
||||||
* @param retryable whether the failure is retryable in a later run
|
|
||||||
*/
|
|
||||||
private record ProcessingOutcome(
|
|
||||||
ProcessingStatus overallStatus,
|
|
||||||
FailureCounters counters,
|
|
||||||
boolean retryable) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,122 @@
|
|||||||
|
package de.gecheckt.pdf.umbenenner.application.service;
|
||||||
|
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.FailureCounters;
|
||||||
|
import de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome;
|
||||||
|
import de.gecheckt.pdf.umbenenner.domain.model.PreCheckFailed;
|
||||||
|
import de.gecheckt.pdf.umbenenner.domain.model.ProcessingStatus;
|
||||||
|
import de.gecheckt.pdf.umbenenner.domain.model.TechnicalDocumentError;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pure status and counter transition policy for document processing outcomes.
|
||||||
|
* <p>
|
||||||
|
* This class encapsulates the deterministic rules for mapping a pipeline outcome
|
||||||
|
* (success, content error, or technical error) to a processing status, updated
|
||||||
|
* failure counters, and retryability flag.
|
||||||
|
* <p>
|
||||||
|
* The transition logic is independent of persistence, orchestration, or any
|
||||||
|
* infrastructure concern. It is purely declarative and stateless.
|
||||||
|
*
|
||||||
|
* <h2>Transition rules</h2>
|
||||||
|
* <ul>
|
||||||
|
* <li><strong>Success:</strong> Status becomes {@link ProcessingStatus#SUCCESS},
|
||||||
|
* counters remain unchanged, {@code retryable=false}.</li>
|
||||||
|
* <li><strong>Deterministic content error (first occurrence):</strong>
|
||||||
|
* Status becomes {@link ProcessingStatus#FAILED_RETRYABLE},
|
||||||
|
* content error counter incremented by 1, {@code retryable=true}.</li>
|
||||||
|
* <li><strong>Deterministic content error (second or later occurrence):</strong>
|
||||||
|
* Status becomes {@link ProcessingStatus#FAILED_FINAL},
|
||||||
|
* content error counter incremented by 1, {@code retryable=false}.</li>
|
||||||
|
* <li><strong>Technical error:</strong> Status becomes {@link ProcessingStatus#FAILED_RETRYABLE},
|
||||||
|
* transient error counter incremented by 1, {@code retryable=true}.</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
final class ProcessingOutcomeTransition {
|
||||||
|
|
||||||
|
private ProcessingOutcomeTransition() {
|
||||||
|
// Static utility class; no instances
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps a pipeline outcome to a processing outcome for a brand-new document.
|
||||||
|
* <p>
|
||||||
|
* For new documents, all failure counters start at zero.
|
||||||
|
*
|
||||||
|
* @param pipelineOutcome the outcome from the extraction and pre-check pipeline
|
||||||
|
* @return the mapped outcome with status, counters, and retryability
|
||||||
|
*/
|
||||||
|
static ProcessingOutcome forNewDocument(DocumentProcessingOutcome pipelineOutcome) {
|
||||||
|
return forKnownDocument(pipelineOutcome, FailureCounters.zero());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps a pipeline outcome to a processing outcome, considering the existing
|
||||||
|
* failure counter state from a known document's history.
|
||||||
|
* <p>
|
||||||
|
* This method applies the deterministic transition rules to produce an updated
|
||||||
|
* status, counters, and retryable flag.
|
||||||
|
*
|
||||||
|
* @param pipelineOutcome the outcome from the extraction and pre-check pipeline
|
||||||
|
* @param existingCounters the current failure counter values from the document's master record
|
||||||
|
* @return the mapped outcome with updated status, counters, and retryability
|
||||||
|
*/
|
||||||
|
static ProcessingOutcome forKnownDocument(
|
||||||
|
DocumentProcessingOutcome pipelineOutcome,
|
||||||
|
FailureCounters existingCounters) {
|
||||||
|
|
||||||
|
return switch (pipelineOutcome) {
|
||||||
|
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> {
|
||||||
|
// Success: document passed all pre-checks
|
||||||
|
yield new ProcessingOutcome(
|
||||||
|
ProcessingStatus.SUCCESS,
|
||||||
|
existingCounters, // counters unchanged on success
|
||||||
|
false // not retryable
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
case PreCheckFailed contentError -> {
|
||||||
|
// Deterministic content error: apply the 1-retry rule
|
||||||
|
FailureCounters updatedCounters = existingCounters.withIncrementedContentErrorCount();
|
||||||
|
boolean isFirstOccurrence = existingCounters.contentErrorCount() == 0;
|
||||||
|
|
||||||
|
if (isFirstOccurrence) {
|
||||||
|
// First content error → FAILED_RETRYABLE
|
||||||
|
yield new ProcessingOutcome(
|
||||||
|
ProcessingStatus.FAILED_RETRYABLE,
|
||||||
|
updatedCounters,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Second (or later) content error → FAILED_FINAL
|
||||||
|
yield new ProcessingOutcome(
|
||||||
|
ProcessingStatus.FAILED_FINAL,
|
||||||
|
updatedCounters,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case TechnicalDocumentError technicalError -> {
|
||||||
|
// Technical error after fingerprinting: always FAILED_RETRYABLE, increment transient counter
|
||||||
|
yield new ProcessingOutcome(
|
||||||
|
ProcessingStatus.FAILED_RETRYABLE,
|
||||||
|
existingCounters.withIncrementedTransientErrorCount(),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Value type carrying the status, updated counters, and retryable flag
|
||||||
|
* after transition from a pipeline outcome.
|
||||||
|
*
|
||||||
|
* @param overallStatus the overall processing status to persist
|
||||||
|
* @param counters the updated failure counters to persist
|
||||||
|
* @param retryable whether a failure is retryable in a later run
|
||||||
|
*/
|
||||||
|
record ProcessingOutcome(
|
||||||
|
ProcessingStatus overallStatus,
|
||||||
|
FailureCounters counters,
|
||||||
|
boolean retryable) {
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user