From 5bf5f48b90e11a1292114db0cfc4b60036103e7b Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Wed, 10 Dec 2025 11:16:34 -0500 Subject: [PATCH] MLE-25782 Some refactoring of BatchWriter Wanted to move this into a separate class so it's easier / cleaner to prototype an incremental check listener. Also made BatchWriteSet nicer by making a bunch of fields final. No change in functionality, just cleaning up code. --- .../datamovement/impl/BatchWriteSet.java | 198 ++++++++---------- .../client/datamovement/impl/BatchWriter.java | 81 +++++++ .../datamovement/impl/WriteBatcherImpl.java | 80 +------ 3 files changed, 171 insertions(+), 188 deletions(-) create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java index ce4426563..8cd7593a6 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java @@ -3,121 +3,95 @@ */ package com.marklogic.client.datamovement.impl; -import java.util.function.Consumer; - import com.marklogic.client.DatabaseClient; -import com.marklogic.client.document.DocumentWriteSet; -import com.marklogic.client.document.ServerTransform; import com.marklogic.client.datamovement.WriteBatch; import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.datamovement.WriteEvent; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.document.ServerTransform; + +import java.util.function.Consumer; -public class BatchWriteSet { - private WriteBatcher batcher; - private DocumentWriteSet writeSet; - private long batchNumber; - private long itemsSoFar; - private DatabaseClient client; - private ServerTransform transform; - private String temporalCollection; - private Runnable onSuccess; - private Consumer onFailure; - private Runnable onBeforeWrite; - - public BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client, - ServerTransform transform, String temporalCollection) - { - this.batcher = batcher; - this.writeSet = writeSet; - this.client = client; - this.transform = transform; - this.temporalCollection = temporalCollection; - } - - public DocumentWriteSet getWriteSet() { - return writeSet; - } - - public void setWriteSet(DocumentWriteSet writeSet) { - this.writeSet = writeSet; - } - - public long getBatchNumber() { - return batchNumber; - } - - public void setBatchNumber(long batchNumber) { - this.batchNumber = batchNumber; - } - - public void setItemsSoFar(long itemsSoFar) { - this.itemsSoFar = itemsSoFar; - } - - public DatabaseClient getClient() { - return client; - } - - public void setClient(DatabaseClient client) { - this.client = client; - } - - public ServerTransform getTransform() { - return transform; - } - - public void setTransform(ServerTransform transform) { - this.transform = transform; - } - - public String getTemporalCollection() { - return temporalCollection; - } - - public void setTemporalCollection(String temporalCollection) { - this.temporalCollection = temporalCollection; - } - - public Runnable getOnSuccess() { - return onSuccess; - } - - public void onSuccess(Runnable onSuccess) { - this.onSuccess = onSuccess; - } - - public Consumer getOnFailure() { - return onFailure; - } - - public void onFailure(Consumer onFailure) { - this.onFailure = onFailure; - } - - public Runnable getOnBeforeWrite() { - return onBeforeWrite; - } - - public void onBeforeWrite(Runnable onBeforeWrite) { - this.onBeforeWrite = onBeforeWrite; - } - - public WriteBatch getBatchOfWriteEvents() { - WriteBatchImpl batch = new WriteBatchImpl() - .withBatcher(batcher) - .withClient(client) - .withJobBatchNumber(batchNumber) - .withJobWritesSoFar(itemsSoFar) - .withJobTicket(batcher.getJobTicket()); - WriteEvent[] writeEvents = getWriteSet().stream() - .map(writeOperation -> - new WriteEventImpl() - .withTargetUri(writeOperation.getUri()) - .withContent(writeOperation.getContent()) - .withMetadata(writeOperation.getMetadata()) - ) - .toArray(WriteEventImpl[]::new); - batch.withItems(writeEvents); - return batch; - } +class BatchWriteSet { + + private final WriteBatcher batcher; + private final DocumentWriteSet writeSet; + private final long batchNumber; + private final DatabaseClient client; + private final ServerTransform transform; + private final String temporalCollection; + + private long itemsSoFar; + private Runnable onSuccess; + private Consumer onFailure; + + BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) { + this.batcher = batcher; + this.writeSet = hostClient.newDocumentManager().newWriteSet(); + this.client = hostClient; + this.transform = transform; + this.temporalCollection = temporalCollection; + this.batchNumber = batchNumber; + } + + public DocumentWriteSet getWriteSet() { + return writeSet; + } + + public long getBatchNumber() { + return batchNumber; + } + + public void setItemsSoFar(long itemsSoFar) { + this.itemsSoFar = itemsSoFar; + } + + public DatabaseClient getClient() { + return client; + } + + public ServerTransform getTransform() { + return transform; + } + + public String getTemporalCollection() { + return temporalCollection; + } + + public Runnable getOnSuccess() { + return onSuccess; + } + + public void onSuccess(Runnable onSuccess) { + this.onSuccess = onSuccess; + } + + public Consumer getOnFailure() { + return onFailure; + } + + public void onFailure(Consumer onFailure) { + this.onFailure = onFailure; + } + + public WriteBatch getBatchOfWriteEvents() { + WriteBatchImpl batch = new WriteBatchImpl() + .withBatcher(batcher) + .withClient(client) + .withJobBatchNumber(batchNumber) + .withJobWritesSoFar(itemsSoFar) + .withJobTicket(batcher.getJobTicket()); + + WriteEvent[] writeEvents = getWriteSet().stream() + .map(writeOperation -> + new WriteEventImpl() + .withTargetUri(writeOperation.getUri()) + .withContent(writeOperation.getContent()) + .withMetadata(writeOperation.getMetadata()) + ) + .toArray(WriteEventImpl[]::new); + + batch.withItems(writeEvents); + return batch; + } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java new file mode 100644 index 000000000..c3d1e6ffe --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.impl; + +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.XMLDocumentManager; +import com.marklogic.client.io.Format; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.function.Consumer; + +class BatchWriter implements Runnable { + + private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class); + + private final BatchWriteSet writeSet; + + BatchWriter(BatchWriteSet writeSet) { + if (writeSet.getWriteSet().size() == 0) { + throw new IllegalStateException("Attempt to write an empty batch"); + } + this.writeSet = writeSet; + } + + @Override + public void run() { + try { + logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost()); + if (writeSet.getTemporalCollection() == null) { + writeSet.getClient().newDocumentManager().write( + writeSet.getWriteSet(), writeSet.getTransform(), null + ); + } else { + // to get access to the TemporalDocumentManager write overload we need to instantiate + // a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content + // format, so we'll set the default content format to unknown + XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager(); + docMgr.setContentFormat(Format.UNKNOWN); + docMgr.write( + writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection() + ); + } + closeAllHandles(); + Runnable onSuccess = writeSet.getOnSuccess(); + if (onSuccess != null) { + onSuccess.run(); + } + } catch (Throwable t) { + logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost()); + Consumer onFailure = writeSet.getOnFailure(); + if (onFailure != null) { + onFailure.accept(t); + } + } + } + + private void closeAllHandles() throws Throwable { + Throwable lastThrowable = null; + for (DocumentWriteOperation doc : writeSet.getWriteSet()) { + try { + if (doc.getContent() instanceof Closeable) { + ((Closeable) doc.getContent()).close(); + } + if (doc.getMetadata() instanceof Closeable) { + ((Closeable) doc.getMetadata()).close(); + } + } catch (Throwable t) { + logger.error("error calling close()", t); + lastThrowable = t; + } + } + if (lastThrowable != null) throw lastThrowable; + } + + public BatchWriteSet getWriteSet() { + return writeSet; + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java index a87775285..424eaff13 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java @@ -3,7 +3,6 @@ */ package com.marklogic.client.datamovement.impl; -import java.io.Closeable; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -12,7 +11,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.stream.Stream; import org.slf4j.Logger; @@ -22,10 +20,8 @@ import com.marklogic.client.DatabaseClientFactory; import com.marklogic.client.document.DocumentWriteOperation; import com.marklogic.client.document.ServerTransform; -import com.marklogic.client.document.XMLDocumentManager; import com.marklogic.client.document.DocumentWriteOperation.OperationType; import com.marklogic.client.io.DocumentMetadataHandle; -import com.marklogic.client.io.Format; import com.marklogic.client.impl.DocumentWriteOperationImpl; import com.marklogic.client.impl.Utilities; import com.marklogic.client.io.marker.AbstractWriteHandle; @@ -281,10 +277,7 @@ private BatchWriteSet newBatchWriteSet() { private BatchWriteSet newBatchWriteSet(long batchNum) { int hostToUse = (int) (batchNum % hostInfos.length); HostInfo host = hostInfos[hostToUse]; - DatabaseClient hostClient = host.client; - BatchWriteSet batchWriteSet = new BatchWriteSet(this, hostClient.newDocumentManager().newWriteSet(), - hostClient, getTransform(), getTemporalCollection()); - batchWriteSet.setBatchNumber(batchNum); + BatchWriteSet batchWriteSet = new BatchWriteSet(this, host.client, getTransform(), getTemporalCollection(), batchNum); batchWriteSet.onSuccess( () -> { sendSuccessToListeners(batchWriteSet); }); @@ -613,15 +606,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf for ( Runnable task : tasks ) { if ( task instanceof BatchWriter ) { BatchWriter writerTask = (BatchWriter) task; - if ( removedHostInfos.containsKey(writerTask.writeSet.getClient().getHost()) ) { + if ( removedHostInfos.containsKey(writerTask.getWriteSet().getClient().getHost()) ) { // this batch was targeting a host that's no longer on the list // if we re-add these docs they'll now be in batches that target acceptable hosts - BatchWriteSet writeSet = newBatchWriteSet(writerTask.writeSet.getBatchNumber()); + BatchWriteSet writeSet = newBatchWriteSet(writerTask.getWriteSet().getBatchNumber()); writeSet.onFailure(throwable -> { if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable; else throw new DataMovementException("Failed to retry batch after failover", throwable); }); - for ( WriteEvent doc : writerTask.writeSet.getBatchOfWriteEvents().getItems() ) { + for ( WriteEvent doc : writerTask.getWriteSet().getBatchOfWriteEvents().getItems() ) { writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } BatchWriter retryWriterTask = new BatchWriter(writeSet); @@ -649,71 +642,6 @@ public static class HostInfo { public DatabaseClient client; } - public static class BatchWriter implements Runnable { - private BatchWriteSet writeSet; - - public BatchWriter(BatchWriteSet writeSet) { - if ( writeSet.getWriteSet().size() == 0 ) { - throw new IllegalStateException("Attempt to write an empty batch"); - } - this.writeSet = writeSet; - } - - @Override - public void run() { - try { - Runnable onBeforeWrite = writeSet.getOnBeforeWrite(); - if ( onBeforeWrite != null ) { - onBeforeWrite.run(); - } - logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost()); - if ( writeSet.getTemporalCollection() == null ) { - writeSet.getClient().newDocumentManager().write( - writeSet.getWriteSet(), writeSet.getTransform(), null - ); - } else { - // to get access to the TemporalDocumentManager write overload we need to instantiate - // a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content - // format, so we'll set the default content format to unknown - XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager(); - docMgr.setContentFormat(Format.UNKNOWN); - docMgr.write( - writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection() - ); - } - closeAllHandles(); - Runnable onSuccess = writeSet.getOnSuccess(); - if ( onSuccess != null ) { - onSuccess.run(); - } - } catch (Throwable t) { - logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost()); - Consumer onFailure = writeSet.getOnFailure(); - if ( onFailure != null ) { - onFailure.accept(t); - } - } - } - - private void closeAllHandles() throws Throwable { - Throwable lastThrowable = null; - for ( DocumentWriteOperation doc : writeSet.getWriteSet() ) { - try { - if ( doc.getContent() instanceof Closeable ) { - ((Closeable) doc.getContent()).close(); - } - if ( doc.getMetadata() instanceof Closeable ) { - ((Closeable) doc.getMetadata()).close(); - } - } catch (Throwable t) { - logger.error("error calling close()", t); - lastThrowable = t; - } - } - if ( lastThrowable != null ) throw lastThrowable; - } - } - /** * The following classes and CompletableThreadPoolExecutor * CompletableRejectedExecutionHandler exist exclusively to enable the