From dc306dbf307098cc3f4bba5979c642036211742c Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Wed, 10 Dec 2025 14:32:06 -0500 Subject: [PATCH] MLE-25782 Refactor: Explicit naming of writeSet objects Was going crazy trying to figure out if a "writeSet" was a BatchWriteSet or DocumentWriteSet. Now using more explicit names. --- .../datamovement/impl/BatchWriteSet.java | 14 +++++--- .../client/datamovement/impl/BatchWriter.java | 32 +++++++++---------- .../datamovement/impl/WriteBatcherImpl.java | 22 ++++++------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java index 8cd7593a6..0c08fdd7b 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java @@ -12,10 +12,14 @@ import java.util.function.Consumer; +/** + * Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the + * DocumentWriteSet is empty when this class is constructed. + */ class BatchWriteSet { private final WriteBatcher batcher; - private final DocumentWriteSet writeSet; + private final DocumentWriteSet documentWriteSet; private final long batchNumber; private final DatabaseClient client; private final ServerTransform transform; @@ -27,15 +31,15 @@ class BatchWriteSet { BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) { this.batcher = batcher; - this.writeSet = hostClient.newDocumentManager().newWriteSet(); + this.documentWriteSet = hostClient.newDocumentManager().newWriteSet(); this.client = hostClient; this.transform = transform; this.temporalCollection = temporalCollection; this.batchNumber = batchNumber; } - public DocumentWriteSet getWriteSet() { - return writeSet; + public DocumentWriteSet getDocumentWriteSet() { + return documentWriteSet; } public long getBatchNumber() { @@ -82,7 +86,7 @@ public WriteBatch getBatchOfWriteEvents() { .withJobWritesSoFar(itemsSoFar) .withJobTicket(batcher.getJobTicket()); - WriteEvent[] writeEvents = getWriteSet().stream() + WriteEvent[] writeEvents = getDocumentWriteSet().stream() .map(writeOperation -> new WriteEventImpl() .withTargetUri(writeOperation.getUri()) diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java index c3d1e6ffe..037a781f3 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java @@ -16,41 +16,41 @@ class BatchWriter implements Runnable { private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class); - private final BatchWriteSet writeSet; + private final BatchWriteSet batchWriteSet; - BatchWriter(BatchWriteSet writeSet) { - if (writeSet.getWriteSet().size() == 0) { + BatchWriter(BatchWriteSet batchWriteSet) { + if (batchWriteSet.getDocumentWriteSet().size() == 0) { throw new IllegalStateException("Attempt to write an empty batch"); } - this.writeSet = writeSet; + this.batchWriteSet = batchWriteSet; } @Override public void run() { try { - logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost()); - if (writeSet.getTemporalCollection() == null) { - writeSet.getClient().newDocumentManager().write( - writeSet.getWriteSet(), writeSet.getTransform(), null + logger.trace("begin write batch {} to forest on host \"{}\"", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost()); + if (batchWriteSet.getTemporalCollection() == null) { + batchWriteSet.getClient().newDocumentManager().write( + batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null ); } else { // to get access to the TemporalDocumentManager write overload we need to instantiate // a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content // format, so we'll set the default content format to unknown - XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager(); + XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager(); docMgr.setContentFormat(Format.UNKNOWN); docMgr.write( - writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection() + batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection() ); } closeAllHandles(); - Runnable onSuccess = writeSet.getOnSuccess(); + Runnable onSuccess = batchWriteSet.getOnSuccess(); if (onSuccess != null) { onSuccess.run(); } } catch (Throwable t) { - logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost()); - Consumer onFailure = writeSet.getOnFailure(); + logger.trace("failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost()); + Consumer onFailure = batchWriteSet.getOnFailure(); if (onFailure != null) { onFailure.accept(t); } @@ -59,7 +59,7 @@ public void run() { private void closeAllHandles() throws Throwable { Throwable lastThrowable = null; - for (DocumentWriteOperation doc : writeSet.getWriteSet()) { + for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) { try { if (doc.getContent() instanceof Closeable) { ((Closeable) doc.getContent()).close(); @@ -75,7 +75,7 @@ private void closeAllHandles() throws Throwable { if (lastThrowable != null) throw lastThrowable; } - public BatchWriteSet getWriteSet() { - return writeSet; + public BatchWriteSet getBatchWriteSet() { + return batchWriteSet; } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java index 424eaff13..e7ae80d9c 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java @@ -204,7 +204,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) { BatchWriteSet writeSet = newBatchWriteSet(); int minBatchSize = 0; if(defaultMetadata != null) { - writeSet.getWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null)); + writeSet.getDocumentWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null)); minBatchSize = 1; } for (int i=0; i < getBatchSize(); i++ ) { @@ -213,9 +213,9 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) { // strange, there should have been a full batch of docs in the queue... break; } - writeSet.getWriteSet().add(doc); + writeSet.getDocumentWriteSet().add(doc); } - if ( writeSet.getWriteSet().size() > minBatchSize ) { + if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) { threadPool.submit( new BatchWriter(writeSet) ); } } @@ -326,7 +326,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) { }); } for (WriteEvent doc : batch.getItems()) { - writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); + writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } BatchWriter runnable = new BatchWriter(writeSet); runnable.run(); @@ -392,12 +392,12 @@ private void flush(boolean waitForCompletion) { } BatchWriteSet writeSet = newBatchWriteSet(); if(defaultMetadata != null) { - writeSet.getWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null)); + writeSet.getDocumentWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null)); } int j=0; for ( ; j < getBatchSize() && iter.hasNext(); j++ ) { DocumentWriteOperation doc = iter.next(); - writeSet.getWriteSet().add(doc); + writeSet.getDocumentWriteSet().add(doc); } threadPool.submit( new BatchWriter(writeSet) ); } @@ -406,7 +406,7 @@ private void flush(boolean waitForCompletion) { } private void sendSuccessToListeners(BatchWriteSet batchWriteSet) { - batchWriteSet.setItemsSoFar(itemsSoFar.addAndGet(batchWriteSet.getWriteSet().size())); + batchWriteSet.setItemsSoFar(itemsSoFar.addAndGet(batchWriteSet.getDocumentWriteSet().size())); WriteBatch batch = batchWriteSet.getBatchOfWriteEvents(); for ( WriteBatchListener successListener : successListeners ) { try { @@ -606,16 +606,16 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf for ( Runnable task : tasks ) { if ( task instanceof BatchWriter ) { BatchWriter writerTask = (BatchWriter) task; - if ( removedHostInfos.containsKey(writerTask.getWriteSet().getClient().getHost()) ) { + if ( removedHostInfos.containsKey(writerTask.getBatchWriteSet().getClient().getHost()) ) { // this batch was targeting a host that's no longer on the list // if we re-add these docs they'll now be in batches that target acceptable hosts - BatchWriteSet writeSet = newBatchWriteSet(writerTask.getWriteSet().getBatchNumber()); + BatchWriteSet writeSet = newBatchWriteSet(writerTask.getBatchWriteSet().getBatchNumber()); writeSet.onFailure(throwable -> { if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable; else throw new DataMovementException("Failed to retry batch after failover", throwable); }); - for ( WriteEvent doc : writerTask.getWriteSet().getBatchOfWriteEvents().getItems() ) { - writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); + for ( WriteEvent doc : writerTask.getBatchWriteSet().getBatchOfWriteEvents().getItems() ) { + writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } BatchWriter retryWriterTask = new BatchWriter(writeSet); Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask);