Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,121 +3,95 @@
*/
package com.marklogic.client.datamovement.impl;

import java.util.function.Consumer;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.WriteEvent;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.ServerTransform;

import java.util.function.Consumer;

public class BatchWriteSet {
private WriteBatcher batcher;
private DocumentWriteSet writeSet;
private long batchNumber;
private long itemsSoFar;
private DatabaseClient client;
private ServerTransform transform;
private String temporalCollection;
private Runnable onSuccess;
private Consumer<Throwable> onFailure;
private Runnable onBeforeWrite;

public BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client,
ServerTransform transform, String temporalCollection)
{
this.batcher = batcher;
this.writeSet = writeSet;
this.client = client;
this.transform = transform;
this.temporalCollection = temporalCollection;
}

public DocumentWriteSet getWriteSet() {
return writeSet;
}

public void setWriteSet(DocumentWriteSet writeSet) {
this.writeSet = writeSet;
}

public long getBatchNumber() {
return batchNumber;
}

public void setBatchNumber(long batchNumber) {
this.batchNumber = batchNumber;
}

public void setItemsSoFar(long itemsSoFar) {
this.itemsSoFar = itemsSoFar;
}

public DatabaseClient getClient() {
return client;
}

public void setClient(DatabaseClient client) {
this.client = client;
}

public ServerTransform getTransform() {
return transform;
}

public void setTransform(ServerTransform transform) {
this.transform = transform;
}

public String getTemporalCollection() {
return temporalCollection;
}

public void setTemporalCollection(String temporalCollection) {
this.temporalCollection = temporalCollection;
}

public Runnable getOnSuccess() {
return onSuccess;
}

public void onSuccess(Runnable onSuccess) {
this.onSuccess = onSuccess;
}

public Consumer<Throwable> getOnFailure() {
return onFailure;
}

public void onFailure(Consumer<Throwable> onFailure) {
this.onFailure = onFailure;
}

public Runnable getOnBeforeWrite() {
return onBeforeWrite;
}

public void onBeforeWrite(Runnable onBeforeWrite) {
this.onBeforeWrite = onBeforeWrite;
}

public WriteBatch getBatchOfWriteEvents() {
WriteBatchImpl batch = new WriteBatchImpl()
.withBatcher(batcher)
.withClient(client)
.withJobBatchNumber(batchNumber)
.withJobWritesSoFar(itemsSoFar)
.withJobTicket(batcher.getJobTicket());
WriteEvent[] writeEvents = getWriteSet().stream()
.map(writeOperation ->
new WriteEventImpl()
.withTargetUri(writeOperation.getUri())
.withContent(writeOperation.getContent())
.withMetadata(writeOperation.getMetadata())
)
.toArray(WriteEventImpl[]::new);
batch.withItems(writeEvents);
return batch;
}
class BatchWriteSet {

private final WriteBatcher batcher;
private final DocumentWriteSet writeSet;
private final long batchNumber;
private final DatabaseClient client;
private final ServerTransform transform;
private final String temporalCollection;

private long itemsSoFar;
private Runnable onSuccess;
private Consumer<Throwable> onFailure;

BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) {
this.batcher = batcher;
this.writeSet = hostClient.newDocumentManager().newWriteSet();
this.client = hostClient;
this.transform = transform;
this.temporalCollection = temporalCollection;
this.batchNumber = batchNumber;
}

public DocumentWriteSet getWriteSet() {
return writeSet;
}

public long getBatchNumber() {
return batchNumber;
}

public void setItemsSoFar(long itemsSoFar) {
this.itemsSoFar = itemsSoFar;
}

public DatabaseClient getClient() {
return client;
}

public ServerTransform getTransform() {
return transform;
}

public String getTemporalCollection() {
return temporalCollection;
}

public Runnable getOnSuccess() {
return onSuccess;
}

public void onSuccess(Runnable onSuccess) {
this.onSuccess = onSuccess;
}

public Consumer<Throwable> getOnFailure() {
return onFailure;
}

public void onFailure(Consumer<Throwable> onFailure) {
this.onFailure = onFailure;
}

public WriteBatch getBatchOfWriteEvents() {
WriteBatchImpl batch = new WriteBatchImpl()
.withBatcher(batcher)
.withClient(client)
.withJobBatchNumber(batchNumber)
.withJobWritesSoFar(itemsSoFar)
.withJobTicket(batcher.getJobTicket());

WriteEvent[] writeEvents = getWriteSet().stream()
.map(writeOperation ->
new WriteEventImpl()
.withTargetUri(writeOperation.getUri())
.withContent(writeOperation.getContent())
.withMetadata(writeOperation.getMetadata())
)
.toArray(WriteEventImpl[]::new);

batch.withItems(writeEvents);
return batch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.XMLDocumentManager;
import com.marklogic.client.io.Format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.function.Consumer;

class BatchWriter implements Runnable {

private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);

private final BatchWriteSet writeSet;

BatchWriter(BatchWriteSet writeSet) {
if (writeSet.getWriteSet().size() == 0) {
throw new IllegalStateException("Attempt to write an empty batch");
}
this.writeSet = writeSet;
}

@Override
public void run() {
try {
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onBeforeWrite callback that was present in the original implementation has been removed. If this callback was being used elsewhere in the codebase, its removal could break existing functionality. Verify that onBeforeWrite was truly unused before removing this feature.

Suggested change
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
Runnable onBeforeWrite = writeSet.getOnBeforeWrite();
if (onBeforeWrite != null) {
onBeforeWrite.run();
}

Copilot uses AI. Check for mistakes.
if (writeSet.getTemporalCollection() == null) {
writeSet.getClient().newDocumentManager().write(
writeSet.getWriteSet(), writeSet.getTransform(), null
);
} else {
// to get access to the TemporalDocumentManager write overload we need to instantiate
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
// format, so we'll set the default content format to unknown
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
docMgr.setContentFormat(Format.UNKNOWN);
docMgr.write(
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
);
}
closeAllHandles();
Runnable onSuccess = writeSet.getOnSuccess();
if (onSuccess != null) {
onSuccess.run();
}
} catch (Throwable t) {
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
Consumer<Throwable> onFailure = writeSet.getOnFailure();
if (onFailure != null) {
onFailure.accept(t);
}
}
}

private void closeAllHandles() throws Throwable {
Throwable lastThrowable = null;
for (DocumentWriteOperation doc : writeSet.getWriteSet()) {
try {
if (doc.getContent() instanceof Closeable) {
((Closeable) doc.getContent()).close();
}
if (doc.getMetadata() instanceof Closeable) {
((Closeable) doc.getMetadata()).close();
}
} catch (Throwable t) {
logger.error("error calling close()", t);
lastThrowable = t;
}
}
if (lastThrowable != null) throw lastThrowable;
}

public BatchWriteSet getWriteSet() {
return writeSet;
}
}
Loading