Skip to content

Commit 6678ff7

Browse files
committed
MLE-25782 Some refactoring of BatchWriter
Wanted to move this into a separate class so it's easier / cleaner to prototype an incremental check listener. Also made BatchWriteSet nicer by making a bunch of fields final. No change in functionality, just cleaning up code.
1 parent 57ad84e commit 6678ff7

File tree

3 files changed

+108
-119
lines changed

3 files changed

+108
-119
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,44 +12,37 @@
1212
import com.marklogic.client.datamovement.WriteBatcher;
1313
import com.marklogic.client.datamovement.WriteEvent;
1414

15-
public class BatchWriteSet {
16-
private WriteBatcher batcher;
17-
private DocumentWriteSet writeSet;
18-
private long batchNumber;
19-
private long itemsSoFar;
20-
private DatabaseClient client;
21-
private ServerTransform transform;
22-
private String temporalCollection;
23-
private Runnable onSuccess;
24-
private Consumer<Throwable> onFailure;
25-
private Runnable onBeforeWrite;
26-
27-
public BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client,
28-
ServerTransform transform, String temporalCollection)
29-
{
30-
this.batcher = batcher;
31-
this.writeSet = writeSet;
32-
this.client = client;
33-
this.transform = transform;
34-
this.temporalCollection = temporalCollection;
35-
}
15+
class BatchWriteSet {
16+
17+
private final WriteBatcher batcher;
18+
private final DocumentWriteSet writeSet;
19+
private final long batchNumber;
20+
private final DatabaseClient client;
21+
private final ServerTransform transform;
22+
private final String temporalCollection;
23+
24+
private long itemsSoFar;
25+
private Runnable onSuccess;
26+
private Consumer<Throwable> onFailure;
27+
28+
BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client,
29+
ServerTransform transform, String temporalCollection, long batchNumber) {
30+
this.batcher = batcher;
31+
this.writeSet = writeSet;
32+
this.client = client;
33+
this.transform = transform;
34+
this.temporalCollection = temporalCollection;
35+
this.batchNumber = batchNumber;
36+
}
3637

3738
public DocumentWriteSet getWriteSet() {
3839
return writeSet;
3940
}
4041

41-
public void setWriteSet(DocumentWriteSet writeSet) {
42-
this.writeSet = writeSet;
43-
}
44-
4542
public long getBatchNumber() {
4643
return batchNumber;
4744
}
4845

49-
public void setBatchNumber(long batchNumber) {
50-
this.batchNumber = batchNumber;
51-
}
52-
5346
public void setItemsSoFar(long itemsSoFar) {
5447
this.itemsSoFar = itemsSoFar;
5548
}
@@ -58,26 +51,14 @@ public DatabaseClient getClient() {
5851
return client;
5952
}
6053

61-
public void setClient(DatabaseClient client) {
62-
this.client = client;
63-
}
64-
6554
public ServerTransform getTransform() {
6655
return transform;
6756
}
6857

69-
public void setTransform(ServerTransform transform) {
70-
this.transform = transform;
71-
}
72-
7358
public String getTemporalCollection() {
7459
return temporalCollection;
7560
}
7661

77-
public void setTemporalCollection(String temporalCollection) {
78-
this.temporalCollection = temporalCollection;
79-
}
80-
8162
public Runnable getOnSuccess() {
8263
return onSuccess;
8364
}
@@ -94,21 +75,14 @@ public void onFailure(Consumer<Throwable> onFailure) {
9475
this.onFailure = onFailure;
9576
}
9677

97-
public Runnable getOnBeforeWrite() {
98-
return onBeforeWrite;
99-
}
100-
101-
public void onBeforeWrite(Runnable onBeforeWrite) {
102-
this.onBeforeWrite = onBeforeWrite;
103-
}
104-
10578
public WriteBatch getBatchOfWriteEvents() {
10679
WriteBatchImpl batch = new WriteBatchImpl()
10780
.withBatcher(batcher)
10881
.withClient(client)
10982
.withJobBatchNumber(batchNumber)
11083
.withJobWritesSoFar(itemsSoFar)
11184
.withJobTicket(batcher.getJobTicket());
85+
11286
WriteEvent[] writeEvents = getWriteSet().stream()
11387
.map(writeOperation ->
11488
new WriteEventImpl()
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.impl;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.XMLDocumentManager;
8+
import com.marklogic.client.io.Format;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.Closeable;
13+
import java.util.function.Consumer;
14+
15+
class BatchWriter implements Runnable {
16+
17+
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
18+
19+
private final BatchWriteSet writeSet;
20+
21+
BatchWriter(BatchWriteSet writeSet) {
22+
if (writeSet.getWriteSet().size() == 0) {
23+
throw new IllegalStateException("Attempt to write an empty batch");
24+
}
25+
this.writeSet = writeSet;
26+
}
27+
28+
@Override
29+
public void run() {
30+
try {
31+
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
32+
if (writeSet.getTemporalCollection() == null) {
33+
writeSet.getClient().newDocumentManager().write(
34+
writeSet.getWriteSet(), writeSet.getTransform(), null
35+
);
36+
} else {
37+
// to get access to the TemporalDocumentManager write overload we need to instantiate
38+
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
39+
// format, so we'll set the default content format to unknown
40+
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
41+
docMgr.setContentFormat(Format.UNKNOWN);
42+
docMgr.write(
43+
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
44+
);
45+
}
46+
closeAllHandles();
47+
Runnable onSuccess = writeSet.getOnSuccess();
48+
if (onSuccess != null) {
49+
onSuccess.run();
50+
}
51+
} catch (Throwable t) {
52+
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
53+
Consumer<Throwable> onFailure = writeSet.getOnFailure();
54+
if (onFailure != null) {
55+
onFailure.accept(t);
56+
}
57+
}
58+
}
59+
60+
private void closeAllHandles() throws Throwable {
61+
Throwable lastThrowable = null;
62+
for (DocumentWriteOperation doc : writeSet.getWriteSet()) {
63+
try {
64+
if (doc.getContent() instanceof Closeable) {
65+
((Closeable) doc.getContent()).close();
66+
}
67+
if (doc.getMetadata() instanceof Closeable) {
68+
((Closeable) doc.getMetadata()).close();
69+
}
70+
} catch (Throwable t) {
71+
logger.error("error calling close()", t);
72+
lastThrowable = t;
73+
}
74+
}
75+
if (lastThrowable != null) throw lastThrowable;
76+
}
77+
78+
public BatchWriteSet getWriteSet() {
79+
return writeSet;
80+
}
81+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 4 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,7 @@ private BatchWriteSet newBatchWriteSet(long batchNum) {
283283
HostInfo host = hostInfos[hostToUse];
284284
DatabaseClient hostClient = host.client;
285285
BatchWriteSet batchWriteSet = new BatchWriteSet(this, hostClient.newDocumentManager().newWriteSet(),
286-
hostClient, getTransform(), getTemporalCollection());
287-
batchWriteSet.setBatchNumber(batchNum);
286+
hostClient, getTransform(), getTemporalCollection(), batchNum);
288287
batchWriteSet.onSuccess( () -> {
289288
sendSuccessToListeners(batchWriteSet);
290289
});
@@ -613,15 +612,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
613612
for ( Runnable task : tasks ) {
614613
if ( task instanceof BatchWriter ) {
615614
BatchWriter writerTask = (BatchWriter) task;
616-
if ( removedHostInfos.containsKey(writerTask.writeSet.getClient().getHost()) ) {
615+
if ( removedHostInfos.containsKey(writerTask.getWriteSet().getClient().getHost()) ) {
617616
// this batch was targeting a host that's no longer on the list
618617
// if we re-add these docs they'll now be in batches that target acceptable hosts
619-
BatchWriteSet writeSet = newBatchWriteSet(writerTask.writeSet.getBatchNumber());
618+
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getWriteSet().getBatchNumber());
620619
writeSet.onFailure(throwable -> {
621620
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
622621
else throw new DataMovementException("Failed to retry batch after failover", throwable);
623622
});
624-
for ( WriteEvent doc : writerTask.writeSet.getBatchOfWriteEvents().getItems() ) {
623+
for ( WriteEvent doc : writerTask.getWriteSet().getBatchOfWriteEvents().getItems() ) {
625624
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
626625
}
627626
BatchWriter retryWriterTask = new BatchWriter(writeSet);
@@ -649,71 +648,6 @@ public static class HostInfo {
649648
public DatabaseClient client;
650649
}
651650

652-
public static class BatchWriter implements Runnable {
653-
private BatchWriteSet writeSet;
654-
655-
public BatchWriter(BatchWriteSet writeSet) {
656-
if ( writeSet.getWriteSet().size() == 0 ) {
657-
throw new IllegalStateException("Attempt to write an empty batch");
658-
}
659-
this.writeSet = writeSet;
660-
}
661-
662-
@Override
663-
public void run() {
664-
try {
665-
Runnable onBeforeWrite = writeSet.getOnBeforeWrite();
666-
if ( onBeforeWrite != null ) {
667-
onBeforeWrite.run();
668-
}
669-
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
670-
if ( writeSet.getTemporalCollection() == null ) {
671-
writeSet.getClient().newDocumentManager().write(
672-
writeSet.getWriteSet(), writeSet.getTransform(), null
673-
);
674-
} else {
675-
// to get access to the TemporalDocumentManager write overload we need to instantiate
676-
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
677-
// format, so we'll set the default content format to unknown
678-
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
679-
docMgr.setContentFormat(Format.UNKNOWN);
680-
docMgr.write(
681-
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
682-
);
683-
}
684-
closeAllHandles();
685-
Runnable onSuccess = writeSet.getOnSuccess();
686-
if ( onSuccess != null ) {
687-
onSuccess.run();
688-
}
689-
} catch (Throwable t) {
690-
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
691-
Consumer<Throwable> onFailure = writeSet.getOnFailure();
692-
if ( onFailure != null ) {
693-
onFailure.accept(t);
694-
}
695-
}
696-
}
697-
698-
private void closeAllHandles() throws Throwable {
699-
Throwable lastThrowable = null;
700-
for ( DocumentWriteOperation doc : writeSet.getWriteSet() ) {
701-
try {
702-
if ( doc.getContent() instanceof Closeable ) {
703-
((Closeable) doc.getContent()).close();
704-
}
705-
if ( doc.getMetadata() instanceof Closeable ) {
706-
((Closeable) doc.getMetadata()).close();
707-
}
708-
} catch (Throwable t) {
709-
logger.error("error calling close()", t);
710-
lastThrowable = t;
711-
}
712-
}
713-
if ( lastThrowable != null ) throw lastThrowable;
714-
}
715-
}
716-
717651
/**
718652
* The following classes and CompletableThreadPoolExecutor
719653
* CompletableRejectedExecutionHandler exist exclusively to enable the

0 commit comments

Comments
 (0)