Optimierung: DefaultBatchRunProcessingUseCase moderat gestrafft
This commit is contained in:
@@ -131,7 +131,7 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
boolean lockAcquired = false;
|
boolean lockAcquired = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Step 1: Acquire exclusive run lock (prevents concurrent instances)
|
// Attempt to acquire the exclusive run lock
|
||||||
try {
|
try {
|
||||||
runLockPort.acquire();
|
runLockPort.acquire();
|
||||||
lockAcquired = true;
|
lockAcquired = true;
|
||||||
@@ -147,39 +147,58 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
LOG.info("Batch run started. RunId: {}, Start: {}",
|
LOG.info("Batch run started. RunId: {}, Start: {}",
|
||||||
context.runId(), context.startInstant());
|
context.runId(), context.startInstant());
|
||||||
|
|
||||||
// Step 2: Load PDF candidates from source folder
|
// Load and process all candidates
|
||||||
List<SourceDocumentCandidate> candidates;
|
return processCandidates(context);
|
||||||
try {
|
|
||||||
candidates = sourceDocumentCandidatesPort.loadCandidates();
|
|
||||||
} catch (SourceDocumentAccessException e) {
|
|
||||||
LOG.error("Cannot access source folder: {}", e.getMessage(), e);
|
|
||||||
return BatchRunOutcome.FAILURE;
|
|
||||||
}
|
|
||||||
LOG.info("Found {} PDF candidate(s) in source folder.", candidates.size());
|
|
||||||
|
|
||||||
// Step 3: Process each candidate through the pipeline
|
|
||||||
for (SourceDocumentCandidate candidate : candidates) {
|
|
||||||
processCandidate(candidate, context);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Batch run completed. Processed {} candidate(s). RunId: {}",
|
|
||||||
candidates.size(), context.runId());
|
|
||||||
return BatchRunOutcome.SUCCESS;
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Unexpected error during batch processing", e);
|
LOG.error("Unexpected error during batch processing", e);
|
||||||
return BatchRunOutcome.FAILURE;
|
return BatchRunOutcome.FAILURE;
|
||||||
} finally {
|
} finally {
|
||||||
// Release the run lock only if it was successfully acquired.
|
releaseLockIfAcquired(lockAcquired);
|
||||||
// If acquire() threw RunLockUnavailableException, the lock belongs to another
|
}
|
||||||
// instance and must not be deleted by this instance.
|
}
|
||||||
if (lockAcquired) {
|
|
||||||
try {
|
/**
|
||||||
runLockPort.release();
|
* Loads candidates and processes them one by one.
|
||||||
LOG.debug("Run lock released.");
|
*
|
||||||
} catch (Exception e) {
|
* @param context the current batch run context
|
||||||
LOG.warn("Warning: Failed to release run lock.", e);
|
* @return SUCCESS if all candidates were processed, FAILURE if source access fails
|
||||||
}
|
*/
|
||||||
|
private BatchRunOutcome processCandidates(BatchRunContext context) {
|
||||||
|
List<SourceDocumentCandidate> candidates;
|
||||||
|
try {
|
||||||
|
candidates = sourceDocumentCandidatesPort.loadCandidates();
|
||||||
|
} catch (SourceDocumentAccessException e) {
|
||||||
|
LOG.error("Cannot access source folder: {}", e.getMessage(), e);
|
||||||
|
return BatchRunOutcome.FAILURE;
|
||||||
|
}
|
||||||
|
LOG.info("Found {} PDF candidate(s) in source folder.", candidates.size());
|
||||||
|
|
||||||
|
// Process each candidate
|
||||||
|
for (SourceDocumentCandidate candidate : candidates) {
|
||||||
|
processCandidate(candidate, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Batch run completed. Processed {} candidate(s). RunId: {}",
|
||||||
|
candidates.size(), context.runId());
|
||||||
|
return BatchRunOutcome.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Releases the run lock if it was previously acquired.
|
||||||
|
* <p>
|
||||||
|
* The lock is only released if it was successfully acquired. If acquire() failed
|
||||||
|
* (another instance holds the lock), this instance must not attempt to release it.
|
||||||
|
*
|
||||||
|
* @param lockAcquired whether the lock was acquired by this instance
|
||||||
|
*/
|
||||||
|
private void releaseLockIfAcquired(boolean lockAcquired) {
|
||||||
|
if (lockAcquired) {
|
||||||
|
try {
|
||||||
|
runLockPort.release();
|
||||||
|
LOG.debug("Run lock released.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Warning: Failed to release run lock.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -213,38 +232,58 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
private void processCandidate(SourceDocumentCandidate candidate, BatchRunContext context) {
|
private void processCandidate(SourceDocumentCandidate candidate, BatchRunContext context) {
|
||||||
LOG.debug("Processing candidate: {}", candidate.uniqueIdentifier());
|
LOG.debug("Processing candidate: {}", candidate.uniqueIdentifier());
|
||||||
|
|
||||||
// Record the attempt start instant before any work begins
|
|
||||||
Instant attemptStart = Instant.now();
|
Instant attemptStart = Instant.now();
|
||||||
|
|
||||||
// Step 1: Compute fingerprint
|
|
||||||
FingerprintResult fingerprintResult = fingerprintPort.computeFingerprint(candidate);
|
FingerprintResult fingerprintResult = fingerprintPort.computeFingerprint(candidate);
|
||||||
|
|
||||||
switch (fingerprintResult) {
|
switch (fingerprintResult) {
|
||||||
case FingerprintTechnicalError fingerprintError -> {
|
case FingerprintTechnicalError fingerprintError -> {
|
||||||
// Pre-fingerprint failure: not historised in SQLite
|
handleFingerprintError(candidate, fingerprintError);
|
||||||
LOG.warn("Fingerprint computation failed for '{}': {} — candidate skipped (not historised).",
|
|
||||||
candidate.uniqueIdentifier(), fingerprintError.errorMessage());
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case FingerprintSuccess fingerprintSuccess -> {
|
case FingerprintSuccess fingerprintSuccess -> {
|
||||||
DocumentFingerprint fingerprint = fingerprintSuccess.fingerprint();
|
handleFingerprintSuccess(candidate, fingerprintSuccess, context, attemptStart);
|
||||||
LOG.debug("Fingerprint computed for '{}': {}",
|
|
||||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
|
||||||
|
|
||||||
// Delegate the complete processing logic to the processor
|
|
||||||
// The processor handles loading document master record, checking terminal status,
|
|
||||||
// executing pipeline only when needed, and persisting results consistently
|
|
||||||
documentProcessingCoordinator.processDeferredOutcome(
|
|
||||||
candidate,
|
|
||||||
fingerprint,
|
|
||||||
context,
|
|
||||||
attemptStart,
|
|
||||||
this::runExtractionPipeline);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a fingerprint computation error by logging it as a non-identifiable event.
|
||||||
|
* No SQLite record is created for this candidate.
|
||||||
|
*
|
||||||
|
* @param candidate the candidate that could not be fingerprinted
|
||||||
|
* @param error the fingerprint error
|
||||||
|
*/
|
||||||
|
private void handleFingerprintError(SourceDocumentCandidate candidate, FingerprintTechnicalError error) {
|
||||||
|
LOG.warn("Fingerprint computation failed for '{}': {} — candidate skipped (not historised).",
|
||||||
|
candidate.uniqueIdentifier(), error.errorMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a successful fingerprint computation by delegating to the processor.
|
||||||
|
* The processor loads the document master record, checks terminal status, executes
|
||||||
|
* the pipeline as needed, and persists results consistently.
|
||||||
|
*
|
||||||
|
* @param candidate the candidate to process
|
||||||
|
* @param fingerprintSuccess the successful fingerprint result
|
||||||
|
* @param context the batch run context
|
||||||
|
* @param attemptStart the instant when processing started
|
||||||
|
*/
|
||||||
|
private void handleFingerprintSuccess(
|
||||||
|
SourceDocumentCandidate candidate,
|
||||||
|
FingerprintSuccess fingerprintSuccess,
|
||||||
|
BatchRunContext context,
|
||||||
|
Instant attemptStart) {
|
||||||
|
DocumentFingerprint fingerprint = fingerprintSuccess.fingerprint();
|
||||||
|
LOG.debug("Fingerprint computed for '{}': {}",
|
||||||
|
candidate.uniqueIdentifier(), fingerprint.sha256Hex());
|
||||||
|
|
||||||
|
documentProcessingCoordinator.processDeferredOutcome(
|
||||||
|
candidate,
|
||||||
|
fingerprint,
|
||||||
|
context,
|
||||||
|
attemptStart,
|
||||||
|
this::runExtractionPipeline);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the pipeline (PDF text extraction + pre-checks) for the given candidate.
|
* Runs the pipeline (PDF text extraction + pre-checks) for the given candidate.
|
||||||
* <p>
|
* <p>
|
||||||
@@ -259,8 +298,24 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
PdfExtractionResult extractionResult =
|
PdfExtractionResult extractionResult =
|
||||||
pdfTextExtractionPort.extractTextAndPageCount(candidate);
|
pdfTextExtractionPort.extractTextAndPageCount(candidate);
|
||||||
|
|
||||||
// Log extraction outcome
|
logExtractionResult(candidate, extractionResult);
|
||||||
switch (extractionResult) {
|
|
||||||
|
DocumentProcessingOutcome outcome =
|
||||||
|
DocumentProcessingService.processDocument(candidate, extractionResult, configuration);
|
||||||
|
|
||||||
|
logProcessingOutcome(candidate, outcome);
|
||||||
|
|
||||||
|
return outcome;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Logs the PDF extraction result for a candidate.
|
||||||
|
*
|
||||||
|
* @param candidate the candidate being processed
|
||||||
|
* @param result the extraction result to log
|
||||||
|
*/
|
||||||
|
private void logExtractionResult(SourceDocumentCandidate candidate, PdfExtractionResult result) {
|
||||||
|
switch (result) {
|
||||||
case de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionSuccess success -> {
|
case de.gecheckt.pdf.umbenenner.domain.model.PdfExtractionSuccess success -> {
|
||||||
LOG.debug("PDF extraction successful for '{}'. Pages: {}, Text length: {} chars.",
|
LOG.debug("PDF extraction successful for '{}'. Pages: {}, Text length: {} chars.",
|
||||||
candidate.uniqueIdentifier(),
|
candidate.uniqueIdentifier(),
|
||||||
@@ -279,11 +334,15 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
// Handle any other cases
|
// Handle any other cases
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DocumentProcessingOutcome outcome =
|
/**
|
||||||
DocumentProcessingService.processDocument(candidate, extractionResult, configuration);
|
* Logs the processing outcome for a candidate.
|
||||||
|
*
|
||||||
// Log outcome
|
* @param candidate the candidate being processed
|
||||||
|
* @param outcome the processing outcome to log
|
||||||
|
*/
|
||||||
|
private void logProcessingOutcome(SourceDocumentCandidate candidate, DocumentProcessingOutcome outcome) {
|
||||||
switch (outcome) {
|
switch (outcome) {
|
||||||
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed passed -> {
|
case de.gecheckt.pdf.umbenenner.domain.model.PreCheckPassed passed -> {
|
||||||
LOG.info("Pre-checks PASSED for '{}'. Candidate ready for persistence.",
|
LOG.info("Pre-checks PASSED for '{}'. Candidate ready for persistence.",
|
||||||
@@ -301,7 +360,5 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
|||||||
// Handle any other cases
|
// Handle any other cases
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return outcome;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -290,7 +290,7 @@ class BatchRunProcessingUseCaseTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// M4-specific: fingerprint failure → not historised
|
// Fingerprint failure handling: not historised
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user