M4 AP-006 Idempotenz- und Persistenzlogik integrieren
This commit is contained in:
@@ -0,0 +1,558 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.service;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentKnownProcessable;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentRecord;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentRecordLookupResult;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentRecordRepository;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentTerminalFinalFailure;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentTerminalSuccess;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentUnknown;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FailureCounters;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.PersistenceLookupTechnicalFailure;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttempt;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingAttemptRepository;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PreCheckFailed;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.ProcessingStatus;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentLocator;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.TechnicalDocumentError;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Application-level service that implements the M4 per-document processing logic.
|
||||
* <p>
|
||||
* This service is the single authoritative place for the M4 decision rules:
|
||||
* idempotency checks, status/counter mapping, and consistent two-level persistence.
|
||||
* It is intentionally tightly scoped to AP-006 and contains no M5+ logic.
|
||||
*
|
||||
* <h2>M4 processing order per candidate</h2>
|
||||
* <ol>
|
||||
* <li>Load the document master record by fingerprint.</li>
|
||||
* <li>If the overall status is {@link ProcessingStatus#SUCCESS} → create and persist
|
||||
* a skip attempt with {@link ProcessingStatus#SKIPPED_ALREADY_PROCESSED}.</li>
|
||||
* <li>If the overall status is {@link ProcessingStatus#FAILED_FINAL} → create and persist
|
||||
* a skip attempt with {@link ProcessingStatus#SKIPPED_FINAL_FAILURE}.</li>
|
||||
* <li>Otherwise execute the M3 flow (already done by the caller) and map the result
|
||||
* into M4 status, counters and retryable flag.</li>
|
||||
* <li>Persist exactly one historised processing attempt for the identified document.</li>
|
||||
* <li>Persist the updated document master record.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>M4 minimal rules</h2>
|
||||
* <ul>
|
||||
* <li>Already successful documents are skipped in later runs.</li>
|
||||
* <li>Already finally failed documents are skipped in later runs.</li>
|
||||
* <li>First historised deterministic content failure from M3 →
|
||||
* {@link ProcessingStatus#FAILED_RETRYABLE}, content error counter becomes 1,
|
||||
* {@code retryable=true}.</li>
|
||||
* <li>Second historised deterministic content failure in a later run →
|
||||
* {@link ProcessingStatus#FAILED_FINAL}, content error counter becomes 2,
|
||||
* {@code retryable=false}.</li>
|
||||
* <li>Document-related technical failures after successful fingerprinting remain
|
||||
* {@link ProcessingStatus#FAILED_RETRYABLE}, increment transient error counter,
|
||||
* {@code retryable=true}.</li>
|
||||
* <li>Skip events do not change error counters.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <h2>Persistence consistency</h2>
|
||||
* <p>
|
||||
* For every identified document, both the processing attempt and the master record are
|
||||
* written in sequence. If either write fails, the failure is logged and the batch run
|
||||
* continues with the next candidate. No partial state is intentionally left; if the
|
||||
* attempt write succeeds but the master record write fails, the inconsistency is bounded
|
||||
* to that one document and is logged clearly. True transactionality across two separate
|
||||
* repository calls is not available without a larger architectural change; this is
|
||||
* documented as a known limitation of the M4 scope.
|
||||
*
|
||||
* <h2>Pre-fingerprint failures</h2>
|
||||
* <p>
|
||||
* Failures that occur before a successful fingerprint is available are <em>not</em>
|
||||
* historised in SQLite. They are handled by the caller and logged as non-identifiable
|
||||
* run events.
|
||||
*
|
||||
* @since M4-AP-006
|
||||
*/
|
||||
public class M4DocumentProcessor {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(M4DocumentProcessor.class);
|
||||
|
||||
private final DocumentRecordRepository documentRecordRepository;
|
||||
private final ProcessingAttemptRepository processingAttemptRepository;
|
||||
|
||||
/**
|
||||
* Creates the M4 document processor with the required persistence ports.
|
||||
*
|
||||
* @param documentRecordRepository port for reading and writing the document master record;
|
||||
* must not be null
|
||||
* @param processingAttemptRepository port for writing and reading the attempt history;
|
||||
* must not be null
|
||||
* @throws NullPointerException if any parameter is null
|
||||
*/
|
||||
public M4DocumentProcessor(
|
||||
DocumentRecordRepository documentRecordRepository,
|
||||
ProcessingAttemptRepository processingAttemptRepository) {
|
||||
this.documentRecordRepository =
|
||||
Objects.requireNonNull(documentRecordRepository, "documentRecordRepository must not be null");
|
||||
this.processingAttemptRepository =
|
||||
Objects.requireNonNull(processingAttemptRepository, "processingAttemptRepository must not be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the full M4 processing logic for one identified document candidate.
|
||||
* <p>
|
||||
* The caller must have already computed a valid {@link DocumentFingerprint} for the
|
||||
* candidate. The M3 outcome (from the PDF extraction and pre-check pipeline) is
|
||||
* provided as {@code m3Outcome} and is used only when the document is not in a
|
||||
* terminal state.
|
||||
* <p>
|
||||
* This method never throws. All persistence failures are caught, logged, and
|
||||
* treated as controlled per-document failures so the batch run can continue.
|
||||
*
|
||||
* @param candidate the source document candidate being processed; must not be null
|
||||
* @param fingerprint the successfully computed fingerprint for this candidate;
|
||||
* must not be null
|
||||
* @param m3Outcome the result of the M3 pipeline (PDF extraction + pre-checks);
|
||||
* must not be null
|
||||
* @param context the current batch run context (for run ID and timing);
|
||||
* must not be null
|
||||
* @param attemptStart the instant at which processing of this candidate began;
|
||||
* must not be null
|
||||
*/
|
||||
public void process(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome m3Outcome,
|
||||
BatchRunContext context,
|
||||
Instant attemptStart) {
|
||||
|
||||
Objects.requireNonNull(candidate, "candidate must not be null");
|
||||
Objects.requireNonNull(fingerprint, "fingerprint must not be null");
|
||||
Objects.requireNonNull(m3Outcome, "m3Outcome must not be null");
|
||||
Objects.requireNonNull(context, "context must not be null");
|
||||
Objects.requireNonNull(attemptStart, "attemptStart must not be null");
|
||||
|
||||
// Step 1: Load the document master record
|
||||
DocumentRecordLookupResult lookupResult =
|
||||
documentRecordRepository.findByFingerprint(fingerprint);
|
||||
|
||||
// Step 2: Handle persistence lookup failure – cannot safely proceed
|
||||
if (lookupResult instanceof PersistenceLookupTechnicalFailure failure) {
|
||||
LOG.error("Cannot process '{}': master record lookup failed: {}",
|
||||
candidate.uniqueIdentifier(), failure.errorMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3: Determine the action based on the lookup result
|
||||
switch (lookupResult) {
|
||||
case DocumentTerminalSuccess terminalSuccess -> {
|
||||
// Document already successfully processed → skip
|
||||
LOG.info("Skipping '{}': already successfully processed (fingerprint: {}).",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
persistSkipAttempt(
|
||||
candidate, fingerprint, terminalSuccess.record(),
|
||||
ProcessingStatus.SKIPPED_ALREADY_PROCESSED,
|
||||
context, attemptStart);
|
||||
}
|
||||
|
||||
case DocumentTerminalFinalFailure terminalFailure -> {
|
||||
// Document finally failed → skip
|
||||
LOG.info("Skipping '{}': already finally failed (fingerprint: {}).",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
persistSkipAttempt(
|
||||
candidate, fingerprint, terminalFailure.record(),
|
||||
ProcessingStatus.SKIPPED_FINAL_FAILURE,
|
||||
context, attemptStart);
|
||||
}
|
||||
|
||||
case DocumentUnknown ignored -> {
|
||||
// New document – process and create a new master record
|
||||
processAndPersistNewDocument(candidate, fingerprint, m3Outcome, context, attemptStart);
|
||||
}
|
||||
|
||||
case DocumentKnownProcessable knownProcessable -> {
|
||||
// Known but not terminal – process and update the existing master record
|
||||
processAndPersistKnownDocument(
|
||||
candidate, fingerprint, m3Outcome, knownProcessable.record(),
|
||||
context, attemptStart);
|
||||
}
|
||||
|
||||
default ->
|
||||
// Exhaustive sealed hierarchy; this branch is unreachable
|
||||
LOG.error("Unexpected lookup result type for '{}': {}",
|
||||
candidate.uniqueIdentifier(), lookupResult.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Skip path
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Persists a skip attempt and updates the master record's {@code updatedAt} timestamp.
|
||||
* <p>
|
||||
* Skip events do not change any failure counter. The master record's overall status
|
||||
* remains unchanged (terminal).
|
||||
*
|
||||
* @param candidate the candidate being skipped
|
||||
* @param fingerprint the document fingerprint
|
||||
* @param existingRecord the current master record (already terminal)
|
||||
* @param skipStatus the skip status to record ({@link ProcessingStatus#SKIPPED_ALREADY_PROCESSED}
|
||||
* or {@link ProcessingStatus#SKIPPED_FINAL_FAILURE})
|
||||
* @param context the current batch run context
|
||||
* @param attemptStart the start instant of this processing attempt
|
||||
*/
|
||||
private void persistSkipAttempt(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentRecord existingRecord,
|
||||
ProcessingStatus skipStatus,
|
||||
BatchRunContext context,
|
||||
Instant attemptStart) {
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
try {
|
||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||
|
||||
ProcessingAttempt skipAttempt = new ProcessingAttempt(
|
||||
fingerprint,
|
||||
context.runId(),
|
||||
attemptNumber,
|
||||
attemptStart,
|
||||
now,
|
||||
skipStatus,
|
||||
null, // no failure class for skip
|
||||
null, // no failure message for skip
|
||||
false // not retryable
|
||||
);
|
||||
|
||||
// Write attempt first, then update master record
|
||||
processingAttemptRepository.save(skipAttempt);
|
||||
|
||||
// Update master record: only updatedAt changes; status and counters stay the same
|
||||
DocumentRecord updatedRecord = new DocumentRecord(
|
||||
existingRecord.fingerprint(),
|
||||
new SourceDocumentLocator(candidate.locator().value()),
|
||||
candidate.uniqueIdentifier(),
|
||||
existingRecord.overallStatus(), // terminal status unchanged
|
||||
existingRecord.failureCounters(), // counters unchanged for skip
|
||||
existingRecord.lastFailureInstant(),
|
||||
existingRecord.lastSuccessInstant(),
|
||||
existingRecord.createdAt(),
|
||||
now // updatedAt = now
|
||||
);
|
||||
documentRecordRepository.update(updatedRecord);
|
||||
|
||||
LOG.debug("Skip attempt #{} persisted for '{}' with status {}.",
|
||||
attemptNumber, candidate.uniqueIdentifier(), skipStatus);
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
LOG.error("Failed to persist skip attempt for '{}': {}",
|
||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// New document path
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Processes a newly discovered document (no existing master record) and persists
|
||||
* both the attempt and the new master record.
|
||||
*
|
||||
* @param candidate the candidate being processed
|
||||
* @param fingerprint the document fingerprint
|
||||
* @param m3Outcome the M3 pipeline result
|
||||
* @param context the current batch run context
|
||||
* @param attemptStart the start instant of this processing attempt
|
||||
*/
|
||||
private void processAndPersistNewDocument(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome m3Outcome,
|
||||
BatchRunContext context,
|
||||
Instant attemptStart) {
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
// Map M3 outcome to M4 status/counters for a brand-new document
|
||||
M4Outcome m4 = mapM3OutcomeForNewDocument(m3Outcome);
|
||||
|
||||
try {
|
||||
// Attempt number is always 1 for a new document
|
||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||
|
||||
ProcessingAttempt attempt = buildAttempt(
|
||||
fingerprint, context, attemptNumber, attemptStart, now, m4);
|
||||
|
||||
// Create the new master record
|
||||
DocumentRecord newRecord = new DocumentRecord(
|
||||
fingerprint,
|
||||
new SourceDocumentLocator(candidate.locator().value()),
|
||||
candidate.uniqueIdentifier(),
|
||||
m4.overallStatus(),
|
||||
m4.counters(),
|
||||
m4.overallStatus() == ProcessingStatus.SUCCESS ? null : now, // lastFailureInstant
|
||||
m4.overallStatus() == ProcessingStatus.SUCCESS ? now : null, // lastSuccessInstant
|
||||
now, // createdAt
|
||||
now // updatedAt
|
||||
);
|
||||
|
||||
// Persist attempt first, then master record
|
||||
processingAttemptRepository.save(attempt);
|
||||
documentRecordRepository.create(newRecord);
|
||||
|
||||
LOG.info("New document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
|
||||
candidate.uniqueIdentifier(),
|
||||
m4.overallStatus(),
|
||||
m4.counters().contentErrorCount(),
|
||||
m4.counters().transientErrorCount());
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
LOG.error("Failed to persist processing result for new document '{}': {}",
|
||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Known processable document path
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Processes a known but non-terminal document and updates both the attempt history
|
||||
* and the master record.
|
||||
*
|
||||
* @param candidate the candidate being processed
|
||||
* @param fingerprint the document fingerprint
|
||||
* @param m3Outcome the M3 pipeline result
|
||||
* @param existingRecord the current master record (not terminal)
|
||||
* @param context the current batch run context
|
||||
* @param attemptStart the start instant of this processing attempt
|
||||
*/
|
||||
private void processAndPersistKnownDocument(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentFingerprint fingerprint,
|
||||
DocumentProcessingOutcome m3Outcome,
|
||||
DocumentRecord existingRecord,
|
||||
BatchRunContext context,
|
||||
Instant attemptStart) {
|
||||
|
||||
Instant now = Instant.now();
|
||||
|
||||
// Map M3 outcome to M4 status/counters, taking existing counters into account
|
||||
M4Outcome m4 = mapM3OutcomeForKnownDocument(m3Outcome, existingRecord.failureCounters());
|
||||
|
||||
try {
|
||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||
|
||||
ProcessingAttempt attempt = buildAttempt(
|
||||
fingerprint, context, attemptNumber, attemptStart, now, m4);
|
||||
|
||||
// Update the master record with new status, counters and timestamps
|
||||
DocumentRecord updatedRecord = new DocumentRecord(
|
||||
existingRecord.fingerprint(),
|
||||
new SourceDocumentLocator(candidate.locator().value()),
|
||||
candidate.uniqueIdentifier(),
|
||||
m4.overallStatus(),
|
||||
m4.counters(),
|
||||
m4.overallStatus() == ProcessingStatus.SUCCESS
|
||||
? existingRecord.lastFailureInstant() : now,
|
||||
m4.overallStatus() == ProcessingStatus.SUCCESS
|
||||
? now : existingRecord.lastSuccessInstant(),
|
||||
existingRecord.createdAt(),
|
||||
now // updatedAt
|
||||
);
|
||||
|
||||
// Persist attempt first, then master record
|
||||
processingAttemptRepository.save(attempt);
|
||||
documentRecordRepository.update(updatedRecord);
|
||||
|
||||
LOG.info("Known document '{}' processed: status={}, contentErrors={}, transientErrors={}.",
|
||||
candidate.uniqueIdentifier(),
|
||||
m4.overallStatus(),
|
||||
m4.counters().contentErrorCount(),
|
||||
m4.counters().transientErrorCount());
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
LOG.error("Failed to persist processing result for known document '{}': {}",
|
||||
candidate.uniqueIdentifier(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// M3 → M4 outcome mapping
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Maps an M3 outcome to M4 status, counters, and retryable flag for a brand-new
|
||||
* document (no prior history, counters start at zero).
|
||||
*
|
||||
* @param m3Outcome the M3 pipeline result
|
||||
* @return the M4 outcome with status, counters and retryable flag
|
||||
*/
|
||||
private M4Outcome mapM3OutcomeForNewDocument(DocumentProcessingOutcome m3Outcome) {
|
||||
return mapM3OutcomeForKnownDocument(m3Outcome, FailureCounters.zero());
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps an M3 outcome to M4 status, counters, and retryable flag, taking the
|
||||
* existing failure counters into account.
|
||||
* <p>
|
||||
* <strong>M4 minimal rules applied here:</strong>
|
||||
* <ul>
|
||||
* <li>M3 success → {@link ProcessingStatus#SUCCESS}, counters unchanged,
|
||||
* {@code retryable=false}.</li>
|
||||
* <li>M3 deterministic content error (first occurrence, contentErrorCount == 0) →
|
||||
* {@link ProcessingStatus#FAILED_RETRYABLE}, contentErrorCount +1,
|
||||
* {@code retryable=true}.</li>
|
||||
* <li>M3 deterministic content error (second occurrence, contentErrorCount >= 1) →
|
||||
* {@link ProcessingStatus#FAILED_FINAL}, contentErrorCount +1,
|
||||
* {@code retryable=false}.</li>
|
||||
* <li>M3 technical error → {@link ProcessingStatus#FAILED_RETRYABLE},
|
||||
* transientErrorCount +1, {@code retryable=true}.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param m3Outcome the M3 pipeline result
|
||||
* @param existingCounters the current failure counters from the master record
|
||||
* @return the M4 outcome with updated status, counters and retryable flag
|
||||
*/
|
||||
private M4Outcome mapM3OutcomeForKnownDocument(
|
||||
DocumentProcessingOutcome m3Outcome,
|
||||
FailureCounters existingCounters) {
|
||||
|
||||
return switch (m3Outcome) {
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed ignored -> {
|
||||
// M3 success: document passed all pre-checks
|
||||
// In M4 scope (no KI, no target copy), PreCheckPassed is the terminal success
|
||||
yield new M4Outcome(
|
||||
ProcessingStatus.SUCCESS,
|
||||
existingCounters, // counters unchanged on success
|
||||
false // not retryable
|
||||
);
|
||||
}
|
||||
|
||||
case PreCheckFailed contentError -> {
|
||||
// Deterministic content error: apply the 1-retry rule
|
||||
FailureCounters updatedCounters = existingCounters.withIncrementedContentErrorCount();
|
||||
boolean isFirstOccurrence = existingCounters.contentErrorCount() == 0;
|
||||
|
||||
if (isFirstOccurrence) {
|
||||
// First content error → FAILED_RETRYABLE
|
||||
yield new M4Outcome(
|
||||
ProcessingStatus.FAILED_RETRYABLE,
|
||||
updatedCounters,
|
||||
true
|
||||
);
|
||||
} else {
|
||||
// Second (or later) content error → FAILED_FINAL
|
||||
yield new M4Outcome(
|
||||
ProcessingStatus.FAILED_FINAL,
|
||||
updatedCounters,
|
||||
false
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
case TechnicalDocumentError technicalError -> {
|
||||
// Technical error after fingerprinting: always FAILED_RETRYABLE, increment transient counter
|
||||
yield new M4Outcome(
|
||||
ProcessingStatus.FAILED_RETRYABLE,
|
||||
existingCounters.withIncrementedTransientErrorCount(),
|
||||
true
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helper: build ProcessingAttempt
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Constructs a {@link ProcessingAttempt} from the given parameters and M4 outcome.
|
||||
*
|
||||
* @param fingerprint the document fingerprint
|
||||
* @param context the current batch run context
|
||||
* @param attemptNumber the monotonic attempt number
|
||||
* @param startedAt the start instant of this attempt
|
||||
* @param endedAt the end instant of this attempt
|
||||
* @param m4 the M4 outcome (status, counters, retryable)
|
||||
* @return the constructed processing attempt
|
||||
*/
|
||||
private ProcessingAttempt buildAttempt(
|
||||
DocumentFingerprint fingerprint,
|
||||
BatchRunContext context,
|
||||
int attemptNumber,
|
||||
Instant startedAt,
|
||||
Instant endedAt,
|
||||
M4Outcome m4) {
|
||||
|
||||
String failureClass = null;
|
||||
String failureMessage = null;
|
||||
|
||||
if (m4.overallStatus() == ProcessingStatus.FAILED_RETRYABLE
|
||||
|| m4.overallStatus() == ProcessingStatus.FAILED_FINAL) {
|
||||
failureClass = m4.overallStatus().name();
|
||||
failureMessage = buildFailureMessage(m4);
|
||||
}
|
||||
|
||||
return new ProcessingAttempt(
|
||||
fingerprint,
|
||||
context.runId(),
|
||||
attemptNumber,
|
||||
startedAt,
|
||||
endedAt,
|
||||
m4.overallStatus(),
|
||||
failureClass,
|
||||
failureMessage,
|
||||
m4.retryable()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a human-readable failure message from the M4 outcome.
|
||||
*
|
||||
* @param m4 the M4 outcome
|
||||
* @return a non-null failure message string
|
||||
*/
|
||||
private String buildFailureMessage(M4Outcome m4) {
|
||||
return switch (m4.overallStatus()) {
|
||||
case FAILED_RETRYABLE -> "Processing failed (retryable). "
|
||||
+ "ContentErrors=" + m4.counters().contentErrorCount()
|
||||
+ ", TransientErrors=" + m4.counters().transientErrorCount();
|
||||
case FAILED_FINAL -> "Processing failed finally (not retryable). "
|
||||
+ "ContentErrors=" + m4.counters().contentErrorCount()
|
||||
+ ", TransientErrors=" + m4.counters().transientErrorCount();
|
||||
default -> m4.overallStatus().name();
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal value type: M4 outcome
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Internal value type carrying the M4 status, updated counters, and retryable flag
|
||||
* after mapping from an M3 outcome.
|
||||
* <p>
|
||||
* Tightly scoped to {@link M4DocumentProcessor}; not exposed outside this class.
|
||||
*
|
||||
* @param overallStatus the M4 overall status to persist
|
||||
* @param counters the updated failure counters to persist
|
||||
* @param retryable whether the failure is retryable in a later run
|
||||
*/
|
||||
private record M4Outcome(
|
||||
ProcessingStatus overallStatus,
|
||||
FailureCounters counters,
|
||||
boolean retryable) {
|
||||
}
|
||||
}
|
||||
@@ -1,28 +1,65 @@
|
||||
/**
|
||||
* Application-level services for business logic evaluation.
|
||||
* Application-level services for business logic evaluation and M4 orchestration.
|
||||
* <p>
|
||||
* This package contains stateless, pure-logic services that evaluate document content
|
||||
* and apply business rules. Services in this package:
|
||||
* This package contains stateless, pure-logic services that evaluate document content,
|
||||
* apply business rules, and orchestrate the M4 per-document processing flow.
|
||||
* Services in this package:
|
||||
* <ul>
|
||||
* <li>Do not manage state or resources</li>
|
||||
* <li>Do not depend on infrastructure (database, filesystem, network)</li>
|
||||
* <li>Do not depend on infrastructure (database, filesystem, network) directly;
|
||||
* they interact with infrastructure exclusively through outbound ports</li>
|
||||
* <li>Can be tested with simple unit tests and in-memory mocks</li>
|
||||
* <li>Are reused by multiple use cases or adapters</li>
|
||||
* </ul>
|
||||
*
|
||||
* Current services:
|
||||
* <h2>Current services</h2>
|
||||
* <ul>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.service.PreCheckEvaluator} — Pre-check evaluation</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.service.DocumentProcessingService} — Complete document processing pipeline orchestration</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.service.PreCheckEvaluator}
|
||||
* — Pre-check evaluation (M3)</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.service.DocumentProcessingService}
|
||||
* — Complete M3 document processing pipeline orchestration</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.service.M4DocumentProcessor}
|
||||
* — M4 per-document idempotency, status/counter mapping and consistent
|
||||
* two-level persistence (AP-006)</li>
|
||||
* </ul>
|
||||
*
|
||||
* Document Processing Pipeline:
|
||||
* The {@link de.gecheckt.pdf.umbenenner.application.service.DocumentProcessingService} coordinates
|
||||
* the complete processing workflow:
|
||||
* <h2>M4 processing flow ({@code M4DocumentProcessor})</h2>
|
||||
* <p>
|
||||
* The {@link de.gecheckt.pdf.umbenenner.application.service.M4DocumentProcessor}
|
||||
* implements the verbindliche M4 processing order per candidate:
|
||||
* <ol>
|
||||
* <li>Convert technical PDF extraction results to processing outcomes</li>
|
||||
* <li>Route successful extractions through pre-check validation</li>
|
||||
* <li>Classify extraction and pre-check failures with appropriate error types</li>
|
||||
* <li>Load the document master record by fingerprint.</li>
|
||||
* <li>If overall status is {@code SUCCESS} → persist a skip attempt with
|
||||
* {@code SKIPPED_ALREADY_PROCESSED}; do not change counters.</li>
|
||||
* <li>If overall status is {@code FAILED_FINAL} → persist a skip attempt with
|
||||
* {@code SKIPPED_FINAL_FAILURE}; do not change counters.</li>
|
||||
* <li>Otherwise map the M3 outcome into M4 status, counters and retryable flag
|
||||
* using the M4 minimal rules.</li>
|
||||
* <li>Persist exactly one historised processing attempt.</li>
|
||||
* <li>Persist the updated document master record.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>M4 minimal rules (status and counter semantics)</h2>
|
||||
* <ul>
|
||||
* <li>First deterministic content error → {@code FAILED_RETRYABLE},
|
||||
* content error counter +1, {@code retryable=true}.</li>
|
||||
* <li>Second deterministic content error → {@code FAILED_FINAL},
|
||||
* content error counter +1 (cumulative = 2), {@code retryable=false}.</li>
|
||||
* <li>Technical error after fingerprinting → {@code FAILED_RETRYABLE},
|
||||
* transient error counter +1, {@code retryable=true}.</li>
|
||||
* <li>Skip events do not change any failure counter.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <h2>Persistence consistency</h2>
|
||||
* <p>
|
||||
* For every identified document, the processing attempt and the master record are
|
||||
* written in sequence. If either write fails, the failure is caught and logged;
|
||||
* the batch run continues with the next candidate. True transactionality across
|
||||
* two separate repository calls is not available in the M4 scope; this is a known
|
||||
* and documented limitation.
|
||||
*
|
||||
* <h2>Pre-fingerprint failures</h2>
|
||||
* <p>
|
||||
* Failures that occur before a successful fingerprint is available are not handled
|
||||
* by this package. They are handled by the use case and are not historised in SQLite.
|
||||
*/
|
||||
package de.gecheckt.pdf.umbenenner.application.service;
|
||||
|
||||
@@ -3,54 +3,80 @@ package de.gecheckt.pdf.umbenenner.application.usecase;
|
||||
import de.gecheckt.pdf.umbenenner.application.config.StartConfiguration;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunOutcome;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunProcessingUseCase;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintPort;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintResult;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintSuccess;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintTechnicalError;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.PdfTextExtractionPort;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.RunLockPort;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.RunLockUnavailableException;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.SourceDocumentAccessException;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.SourceDocumentCandidatesPort;
|
||||
import de.gecheckt.pdf.umbenenner.application.service.DocumentProcessingService;
|
||||
import de.gecheckt.pdf.umbenenner.application.service.M4DocumentProcessor;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.BatchRunContext;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PreCheckFailed;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.TechnicalDocumentError;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionContentError;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.DocumentFingerprint;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.DocumentProcessingOutcome;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionResult;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionSuccess;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionTechnicalError;
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.SourceDocumentCandidate;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Batch processing implementation of {@link BatchRunProcessingUseCase}.
|
||||
* <p>
|
||||
* Orchestrates the complete batch processing workflow:
|
||||
* Orchestrates the complete M4 batch processing workflow per candidate:
|
||||
* <ol>
|
||||
* <li>Acquire exclusive run lock to prevent concurrent instances</li>
|
||||
* <li>Scan source folder for PDF candidates</li>
|
||||
* <li>For each candidate: extract text and page count, run pre-checks</li>
|
||||
* <li>Log per-document decision; end each document controlled without KI or target copy</li>
|
||||
* <li>Release lock and return structured outcome for Bootstrap exit code mapping</li>
|
||||
* <li>Acquire exclusive run lock to prevent concurrent instances.</li>
|
||||
* <li>Scan source folder for PDF candidates.</li>
|
||||
* <li>For each candidate, execute the M4 processing order:
|
||||
* <ol type="a">
|
||||
* <li>Compute fingerprint.</li>
|
||||
* <li>Load document master record.</li>
|
||||
* <li>If already {@code SUCCESS} → persist skip attempt with
|
||||
* {@code SKIPPED_ALREADY_PROCESSED}.</li>
|
||||
* <li>If already {@code FAILED_FINAL} → persist skip attempt with
|
||||
* {@code SKIPPED_FINAL_FAILURE}.</li>
|
||||
* <li>Otherwise execute the M3 pipeline (extraction + pre-checks).</li>
|
||||
* <li>Map M3 result into M4 status, counters and retryable flag.</li>
|
||||
* <li>Persist exactly one historised processing attempt.</li>
|
||||
* <li>Persist the updated document master record.</li>
|
||||
* </ol>
|
||||
* </li>
|
||||
* <li>Release lock and return structured outcome for Bootstrap exit code mapping.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>Idempotency</h2>
|
||||
* <p>
|
||||
* Processing boundary:
|
||||
* <ul>
|
||||
* <li>Documents that pass pre-checks end controlled and are ready for further processing (KI, persistence, copy)</li>
|
||||
* <li>Documents with deterministic content errors (no usable text, page limit exceeded) end controlled</li>
|
||||
* <li>Documents with technical extraction errors end controlled; they do not abort the overall run</li>
|
||||
* <li>If the source folder itself is inaccessible, the run fails with {@link BatchRunOutcome#FAILURE}</li>
|
||||
* </ul>
|
||||
* Documents are identified exclusively by their SHA-256 content fingerprint. A document
|
||||
* whose overall status is {@code SUCCESS} or {@code FAILED_FINAL} is skipped in every
|
||||
* subsequent run; only a skip attempt is historised.
|
||||
*
|
||||
* <h2>Pre-fingerprint failures</h2>
|
||||
* <p>
|
||||
* Non-Goals (not implemented):
|
||||
* If the fingerprint computation fails (e.g. the file is no longer readable), the
|
||||
* candidate is logged as a non-identifiable run event and is <em>not</em> historised
|
||||
* in SQLite. The batch run continues with the next candidate.
|
||||
*
|
||||
* <h2>Persistence consistency</h2>
|
||||
* <p>
|
||||
* For every identified document, the processing attempt and the master record are
|
||||
* written in sequence by {@link M4DocumentProcessor}. Persistence failures for a single
|
||||
* document are caught and logged; the batch run continues with the remaining candidates.
|
||||
*
|
||||
* <h2>Non-Goals (not implemented in M4)</h2>
|
||||
* <ul>
|
||||
* <li>No fingerprinting or SQLite persistence</li>
|
||||
* <li>No KI/AI integration or prompt loading</li>
|
||||
* <li>No filename generation or target file copy</li>
|
||||
* <li>No cross-run retry logic</li>
|
||||
* <li>No KI/AI integration or prompt loading.</li>
|
||||
* <li>No filename generation or target file copy.</li>
|
||||
* <li>No M5+ retry rules for KI or target copy failures.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @since M3-AP-004 (extended in M4-AP-006)
|
||||
*/
|
||||
public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCase {
|
||||
|
||||
@@ -60,28 +86,44 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
private final RunLockPort runLockPort;
|
||||
private final SourceDocumentCandidatesPort sourceDocumentCandidatesPort;
|
||||
private final PdfTextExtractionPort pdfTextExtractionPort;
|
||||
private final FingerprintPort fingerprintPort;
|
||||
private final M4DocumentProcessor m4DocumentProcessor;
|
||||
|
||||
/**
|
||||
* Creates the batch use case with the already-loaded startup configuration and all required ports.
|
||||
* Creates the batch use case with the already-loaded startup configuration and all
|
||||
* required ports for the M4 flow.
|
||||
* <p>
|
||||
* The configuration is loaded and validated by Bootstrap before use case creation;
|
||||
* the use case receives the result directly and does not re-read it.
|
||||
* the use case receives the result directly and does not re-read the properties file.
|
||||
*
|
||||
* @param configuration the validated startup configuration
|
||||
* @param runLockPort for exclusive run locking
|
||||
* @param sourceDocumentCandidatesPort for loading PDF candidates from the source folder
|
||||
* @param pdfTextExtractionPort for extracting text and page count from a single PDF
|
||||
* @param configuration the validated startup configuration; must not be null
|
||||
* @param runLockPort for exclusive run locking; must not be null
|
||||
* @param sourceDocumentCandidatesPort for loading PDF candidates from the source folder;
|
||||
* must not be null
|
||||
* @param pdfTextExtractionPort for extracting text and page count from a single PDF;
|
||||
* must not be null
|
||||
* @param fingerprintPort for computing the SHA-256 fingerprint of a candidate;
|
||||
* must not be null
|
||||
* @param m4DocumentProcessor for applying M4 decision logic and persisting results;
|
||||
* must not be null
|
||||
* @throws NullPointerException if any parameter is null
|
||||
*/
|
||||
public DefaultBatchRunProcessingUseCase(
|
||||
StartConfiguration configuration,
|
||||
RunLockPort runLockPort,
|
||||
SourceDocumentCandidatesPort sourceDocumentCandidatesPort,
|
||||
PdfTextExtractionPort pdfTextExtractionPort) {
|
||||
this.configuration = configuration;
|
||||
this.runLockPort = runLockPort;
|
||||
this.sourceDocumentCandidatesPort = sourceDocumentCandidatesPort;
|
||||
this.pdfTextExtractionPort = pdfTextExtractionPort;
|
||||
PdfTextExtractionPort pdfTextExtractionPort,
|
||||
FingerprintPort fingerprintPort,
|
||||
M4DocumentProcessor m4DocumentProcessor) {
|
||||
this.configuration = Objects.requireNonNull(configuration, "configuration must not be null");
|
||||
this.runLockPort = Objects.requireNonNull(runLockPort, "runLockPort must not be null");
|
||||
this.sourceDocumentCandidatesPort = Objects.requireNonNull(
|
||||
sourceDocumentCandidatesPort, "sourceDocumentCandidatesPort must not be null");
|
||||
this.pdfTextExtractionPort = Objects.requireNonNull(
|
||||
pdfTextExtractionPort, "pdfTextExtractionPort must not be null");
|
||||
this.fingerprintPort = Objects.requireNonNull(fingerprintPort, "fingerprintPort must not be null");
|
||||
this.m4DocumentProcessor = Objects.requireNonNull(
|
||||
m4DocumentProcessor, "m4DocumentProcessor must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -96,12 +138,15 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
lockAcquired = true;
|
||||
LOG.debug("Run lock acquired successfully.");
|
||||
} catch (RunLockUnavailableException e) {
|
||||
LOG.warn("Run lock not available – another instance is already running. This instance terminates immediately.");
|
||||
LOG.warn("Run lock not available – another instance is already running. "
|
||||
+ "This instance terminates immediately.");
|
||||
return BatchRunOutcome.LOCK_UNAVAILABLE;
|
||||
}
|
||||
|
||||
LOG.debug("Configuration in use: source={}, target={}", configuration.sourceFolder(), configuration.targetFolder());
|
||||
LOG.info("Batch run started. RunId: {}, Start: {}", context.runId(), context.startInstant());
|
||||
LOG.debug("Configuration in use: source={}, target={}",
|
||||
configuration.sourceFolder(), configuration.targetFolder());
|
||||
LOG.info("Batch run started. RunId: {}, Start: {}",
|
||||
context.runId(), context.startInstant());
|
||||
|
||||
// Step 2: Load PDF candidates from source folder
|
||||
List<SourceDocumentCandidate> candidates;
|
||||
@@ -113,12 +158,13 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
}
|
||||
LOG.info("Found {} PDF candidate(s) in source folder.", candidates.size());
|
||||
|
||||
// Step 3: Process each candidate through the pipeline
|
||||
// Step 3: Process each candidate through the M4 pipeline
|
||||
for (SourceDocumentCandidate candidate : candidates) {
|
||||
processCandidate(candidate);
|
||||
processCandidate(candidate, context);
|
||||
}
|
||||
|
||||
LOG.info("Batch run completed. Processed {} candidate(s). RunId: {}", candidates.size(), context.runId());
|
||||
LOG.info("Batch run completed. Processed {} candidate(s). RunId: {}",
|
||||
candidates.size(), context.runId());
|
||||
return BatchRunOutcome.SUCCESS;
|
||||
|
||||
} catch (Exception e) {
|
||||
@@ -126,8 +172,8 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
return BatchRunOutcome.FAILURE;
|
||||
} finally {
|
||||
// Release the run lock only if it was successfully acquired.
|
||||
// If acquire() threw RunLockUnavailableException, the lock belongs to another instance
|
||||
// and must not be deleted by this instance.
|
||||
// If acquire() threw RunLockUnavailableException, the lock belongs to another
|
||||
// instance and must not be deleted by this instance.
|
||||
if (lockAcquired) {
|
||||
try {
|
||||
runLockPort.release();
|
||||
@@ -140,56 +186,105 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a single PDF candidate through the complete pipeline.
|
||||
* Processes a single PDF candidate through the complete M4 pipeline.
|
||||
* <p>
|
||||
* Processing steps per document:
|
||||
* M4 processing order:
|
||||
* <ol>
|
||||
* <li>Log candidate recognition</li>
|
||||
* <li>Extract text and page count from the PDF via {@link PdfTextExtractionPort}</li>
|
||||
* <li>Process extraction result through pre-checks via {@link DocumentProcessingService}</li>
|
||||
* <li>Log extraction outcome and final decision</li>
|
||||
* <li>Record the attempt start instant.</li>
|
||||
* <li>Compute the SHA-256 fingerprint of the candidate file content.</li>
|
||||
* <li>If fingerprint computation fails: log as non-identifiable run event and
|
||||
* return — no SQLite record is created.</li>
|
||||
* <li>Execute the M3 pipeline (PDF extraction + pre-checks).</li>
|
||||
* <li>Delegate to {@link M4DocumentProcessor} for idempotency check, status/counter
|
||||
* mapping, and consistent two-level persistence.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* Per-document errors (extraction failure, technical error, pre-check failure) do not abort the overall
|
||||
* batch run. Each candidate ends controlled regardless of its outcome.
|
||||
* <p>
|
||||
* Processing boundary: no KI call, no persistence, no filename generation,
|
||||
* no target file copy is initiated here, even for candidates that pass all pre-checks.
|
||||
* Per-document errors do not abort the overall batch run. Each candidate ends
|
||||
* controlled regardless of its outcome.
|
||||
*
|
||||
* @param candidate the candidate to process
|
||||
* @param context the current batch run context
|
||||
*/
|
||||
private void processCandidate(SourceDocumentCandidate candidate) {
|
||||
private void processCandidate(SourceDocumentCandidate candidate, BatchRunContext context) {
|
||||
LOG.debug("Processing candidate: {}", candidate.uniqueIdentifier());
|
||||
|
||||
PdfExtractionResult extractionResult = pdfTextExtractionPort.extractTextAndPageCount(candidate);
|
||||
// Record the attempt start instant before any work begins
|
||||
Instant attemptStart = Instant.now();
|
||||
|
||||
// Step M4-1: Compute fingerprint
|
||||
FingerprintResult fingerprintResult = fingerprintPort.computeFingerprint(candidate);
|
||||
|
||||
switch (fingerprintResult) {
|
||||
case FingerprintTechnicalError fingerprintError -> {
|
||||
// Pre-fingerprint failure: not historised in SQLite
|
||||
LOG.warn("Fingerprint computation failed for '{}': {} — candidate skipped (not historised).",
|
||||
candidate.uniqueIdentifier(), fingerprintError.errorMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
case FingerprintSuccess fingerprintSuccess -> {
|
||||
DocumentFingerprint fingerprint = fingerprintSuccess.fingerprint();
|
||||
LOG.debug("Fingerprint computed for '{}': {}",
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||
|
||||
// Step M4-2..M4-8: Execute M3 pipeline and delegate M4 logic to the processor
|
||||
// The M3 pipeline runs only if the document is not in a terminal state;
|
||||
// M4DocumentProcessor handles the terminal check internally.
|
||||
// We run M3 eagerly here and pass the result; M4DocumentProcessor will
|
||||
// ignore it for terminal documents.
|
||||
DocumentProcessingOutcome m3Outcome = runM3Pipeline(candidate);
|
||||
|
||||
// Delegate idempotency check, status mapping, and persistence to M4DocumentProcessor
|
||||
m4DocumentProcessor.process(candidate, fingerprint, m3Outcome, context, attemptStart);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the M3 pipeline (PDF text extraction + pre-checks) for the given candidate.
|
||||
* <p>
|
||||
* This method is called after a successful fingerprint computation. The result is
|
||||
* passed to {@link M4DocumentProcessor}, which applies it only when the document is
|
||||
* not in a terminal state.
|
||||
*
|
||||
* @param candidate the candidate to run through the M3 pipeline
|
||||
* @return the M3 pipeline outcome (pre-check passed, pre-check failed, or technical error)
|
||||
*/
|
||||
private DocumentProcessingOutcome runM3Pipeline(SourceDocumentCandidate candidate) {
|
||||
PdfExtractionResult extractionResult =
|
||||
pdfTextExtractionPort.extractTextAndPageCount(candidate);
|
||||
|
||||
// Log extraction outcome
|
||||
switch (extractionResult) {
|
||||
case PdfExtractionSuccess success ->
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionSuccess success ->
|
||||
LOG.debug("PDF extraction successful for '{}'. Pages: {}, Text length: {} chars.",
|
||||
candidate.uniqueIdentifier(), success.pageCount().value(), success.extractedText().length());
|
||||
case PdfExtractionContentError contentError ->
|
||||
candidate.uniqueIdentifier(),
|
||||
success.pageCount().value(),
|
||||
success.extractedText().length());
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionContentError contentError ->
|
||||
LOG.debug("PDF content extraction failed for '{}' (content problem): {}",
|
||||
candidate.uniqueIdentifier(), contentError.reason());
|
||||
case PdfExtractionTechnicalError technicalError ->
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionTechnicalError technicalError ->
|
||||
LOG.debug("PDF extraction technical error for '{}': {}",
|
||||
candidate.uniqueIdentifier(), technicalError.errorMessage());
|
||||
}
|
||||
|
||||
// Process through complete pipeline
|
||||
var outcome = DocumentProcessingService.processDocument(candidate, extractionResult, configuration);
|
||||
DocumentProcessingOutcome outcome =
|
||||
DocumentProcessingService.processDocument(candidate, extractionResult, configuration);
|
||||
|
||||
// Log processing outcome
|
||||
// Log M3 outcome
|
||||
switch (outcome) {
|
||||
case PreCheckPassed passed ->
|
||||
LOG.info("Pre-checks PASSED for '{}'. Candidate ready for further processing.",
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed passed ->
|
||||
LOG.info("Pre-checks PASSED for '{}'. Candidate ready for M4 persistence.",
|
||||
candidate.uniqueIdentifier());
|
||||
case PreCheckFailed failed ->
|
||||
LOG.info("Pre-checks FAILED for '{}': {} (Deterministic content error – may retry in later run).",
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckFailed failed ->
|
||||
LOG.info("Pre-checks FAILED for '{}': {} (Deterministic content error).",
|
||||
candidate.uniqueIdentifier(), failed.failureReasonDescription());
|
||||
case TechnicalDocumentError technicalError ->
|
||||
LOG.warn("Processing FAILED for '{}': {} (Technical error – may retry in later run).",
|
||||
case de.gecheckt.pdf.umbenenner.domain.model.TechnicalDocumentError technicalError ->
|
||||
LOG.warn("Processing FAILED for '{}': {} (Technical error – retryable).",
|
||||
candidate.uniqueIdentifier(), technicalError.errorMessage());
|
||||
}
|
||||
|
||||
return outcome;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,25 @@
|
||||
* Implementations:
|
||||
* <ul>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.usecase.DefaultBatchRunProcessingUseCase}
|
||||
* — Production implementation with run lock and controlled batch cycle</li>
|
||||
* — Production implementation with run lock, M4 fingerprint-based idempotency,
|
||||
* and consistent two-level persistence (extended in M4-AP-006)</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* <h2>M4 processing order (AP-006)</h2>
|
||||
* <p>
|
||||
* For each candidate, {@link de.gecheckt.pdf.umbenenner.application.usecase.DefaultBatchRunProcessingUseCase}
|
||||
* enforces this order:
|
||||
* <ol>
|
||||
* <li>Compute SHA-256 fingerprint of the candidate file content.</li>
|
||||
* <li>If fingerprint computation fails: log as non-identifiable run event;
|
||||
* do NOT write any SQLite record; continue with next candidate.</li>
|
||||
* <li>Run the M3 pipeline (PDF extraction + pre-checks).</li>
|
||||
* <li>Delegate to {@link de.gecheckt.pdf.umbenenner.application.service.M4DocumentProcessor}
|
||||
* for idempotency check, status/counter mapping, and consistent persistence.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* All implementations are infrastructure-agnostic and interact only through ports.
|
||||
*
|
||||
* @since M2 (extended in M4-AP-006)
|
||||
*/
|
||||
package de.gecheckt.pdf.umbenenner.application.usecase;
|
||||
package de.gecheckt.pdf.umbenenner.application.usecase;
|
||||
|
||||
Reference in New Issue
Block a user