Skip to content

Commit ed87416

Browse files
committed
MLE-25782 Some refactoring of BatchWriter
Wanted to move this into a separate class so it's easier / cleaner to prototype an incremental check listener. Also made BatchWriteSet nicer by making a bunch of fields final. No change in functionality, just cleaning up code.
1 parent 57ad84e commit ed87416

File tree

3 files changed

+171
-184
lines changed

3 files changed

+171
-184
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java

Lines changed: 86 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -3,121 +3,95 @@
33
*/
44
package com.marklogic.client.datamovement.impl;
55

6-
import java.util.function.Consumer;
7-
86
import com.marklogic.client.DatabaseClient;
9-
import com.marklogic.client.document.DocumentWriteSet;
10-
import com.marklogic.client.document.ServerTransform;
117
import com.marklogic.client.datamovement.WriteBatch;
128
import com.marklogic.client.datamovement.WriteBatcher;
139
import com.marklogic.client.datamovement.WriteEvent;
10+
import com.marklogic.client.document.DocumentWriteSet;
11+
import com.marklogic.client.document.ServerTransform;
12+
13+
import java.util.function.Consumer;
1414

15-
public class BatchWriteSet {
16-
private WriteBatcher batcher;
17-
private DocumentWriteSet writeSet;
18-
private long batchNumber;
19-
private long itemsSoFar;
20-
private DatabaseClient client;
21-
private ServerTransform transform;
22-
private String temporalCollection;
23-
private Runnable onSuccess;
24-
private Consumer<Throwable> onFailure;
25-
private Runnable onBeforeWrite;
26-
27-
public BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client,
28-
ServerTransform transform, String temporalCollection)
29-
{
30-
this.batcher = batcher;
31-
this.writeSet = writeSet;
32-
this.client = client;
33-
this.transform = transform;
34-
this.temporalCollection = temporalCollection;
35-
}
36-
37-
public DocumentWriteSet getWriteSet() {
38-
return writeSet;
39-
}
40-
41-
public void setWriteSet(DocumentWriteSet writeSet) {
42-
this.writeSet = writeSet;
43-
}
44-
45-
public long getBatchNumber() {
46-
return batchNumber;
47-
}
48-
49-
public void setBatchNumber(long batchNumber) {
50-
this.batchNumber = batchNumber;
51-
}
52-
53-
public void setItemsSoFar(long itemsSoFar) {
54-
this.itemsSoFar = itemsSoFar;
55-
}
56-
57-
public DatabaseClient getClient() {
58-
return client;
59-
}
60-
61-
public void setClient(DatabaseClient client) {
62-
this.client = client;
63-
}
64-
65-
public ServerTransform getTransform() {
66-
return transform;
67-
}
68-
69-
public void setTransform(ServerTransform transform) {
70-
this.transform = transform;
71-
}
72-
73-
public String getTemporalCollection() {
74-
return temporalCollection;
75-
}
76-
77-
public void setTemporalCollection(String temporalCollection) {
78-
this.temporalCollection = temporalCollection;
79-
}
80-
81-
public Runnable getOnSuccess() {
82-
return onSuccess;
83-
}
84-
85-
public void onSuccess(Runnable onSuccess) {
86-
this.onSuccess = onSuccess;
87-
}
88-
89-
public Consumer<Throwable> getOnFailure() {
90-
return onFailure;
91-
}
92-
93-
public void onFailure(Consumer<Throwable> onFailure) {
94-
this.onFailure = onFailure;
95-
}
96-
97-
public Runnable getOnBeforeWrite() {
98-
return onBeforeWrite;
99-
}
100-
101-
public void onBeforeWrite(Runnable onBeforeWrite) {
102-
this.onBeforeWrite = onBeforeWrite;
103-
}
104-
105-
public WriteBatch getBatchOfWriteEvents() {
106-
WriteBatchImpl batch = new WriteBatchImpl()
107-
.withBatcher(batcher)
108-
.withClient(client)
109-
.withJobBatchNumber(batchNumber)
110-
.withJobWritesSoFar(itemsSoFar)
111-
.withJobTicket(batcher.getJobTicket());
112-
WriteEvent[] writeEvents = getWriteSet().stream()
113-
.map(writeOperation ->
114-
new WriteEventImpl()
115-
.withTargetUri(writeOperation.getUri())
116-
.withContent(writeOperation.getContent())
117-
.withMetadata(writeOperation.getMetadata())
118-
)
119-
.toArray(WriteEventImpl[]::new);
120-
batch.withItems(writeEvents);
121-
return batch;
122-
}
15+
class BatchWriteSet {
16+
17+
private final WriteBatcher batcher;
18+
private final DocumentWriteSet writeSet;
19+
private final long batchNumber;
20+
private final DatabaseClient client;
21+
private final ServerTransform transform;
22+
private final String temporalCollection;
23+
24+
private long itemsSoFar;
25+
private Runnable onSuccess;
26+
private Consumer<Throwable> onFailure;
27+
28+
BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) {
29+
this.batcher = batcher;
30+
this.writeSet = hostClient.newDocumentManager().newWriteSet();
31+
this.client = hostClient;
32+
this.transform = transform;
33+
this.temporalCollection = temporalCollection;
34+
this.batchNumber = batchNumber;
35+
}
36+
37+
public DocumentWriteSet getWriteSet() {
38+
return writeSet;
39+
}
40+
41+
public long getBatchNumber() {
42+
return batchNumber;
43+
}
44+
45+
public void setItemsSoFar(long itemsSoFar) {
46+
this.itemsSoFar = itemsSoFar;
47+
}
48+
49+
public DatabaseClient getClient() {
50+
return client;
51+
}
52+
53+
public ServerTransform getTransform() {
54+
return transform;
55+
}
56+
57+
public String getTemporalCollection() {
58+
return temporalCollection;
59+
}
60+
61+
public Runnable getOnSuccess() {
62+
return onSuccess;
63+
}
64+
65+
public void onSuccess(Runnable onSuccess) {
66+
this.onSuccess = onSuccess;
67+
}
68+
69+
public Consumer<Throwable> getOnFailure() {
70+
return onFailure;
71+
}
72+
73+
public void onFailure(Consumer<Throwable> onFailure) {
74+
this.onFailure = onFailure;
75+
}
76+
77+
public WriteBatch getBatchOfWriteEvents() {
78+
WriteBatchImpl batch = new WriteBatchImpl()
79+
.withBatcher(batcher)
80+
.withClient(client)
81+
.withJobBatchNumber(batchNumber)
82+
.withJobWritesSoFar(itemsSoFar)
83+
.withJobTicket(batcher.getJobTicket());
84+
85+
WriteEvent[] writeEvents = getWriteSet().stream()
86+
.map(writeOperation ->
87+
new WriteEventImpl()
88+
.withTargetUri(writeOperation.getUri())
89+
.withContent(writeOperation.getContent())
90+
.withMetadata(writeOperation.getMetadata())
91+
)
92+
.toArray(WriteEventImpl[]::new);
93+
94+
batch.withItems(writeEvents);
95+
return batch;
96+
}
12397
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.impl;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.XMLDocumentManager;
8+
import com.marklogic.client.io.Format;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.Closeable;
13+
import java.util.function.Consumer;
14+
15+
class BatchWriter implements Runnable {
16+
17+
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
18+
19+
private final BatchWriteSet writeSet;
20+
21+
BatchWriter(BatchWriteSet writeSet) {
22+
if (writeSet.getWriteSet().size() == 0) {
23+
throw new IllegalStateException("Attempt to write an empty batch");
24+
}
25+
this.writeSet = writeSet;
26+
}
27+
28+
@Override
29+
public void run() {
30+
try {
31+
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
32+
if (writeSet.getTemporalCollection() == null) {
33+
writeSet.getClient().newDocumentManager().write(
34+
writeSet.getWriteSet(), writeSet.getTransform(), null
35+
);
36+
} else {
37+
// to get access to the TemporalDocumentManager write overload we need to instantiate
38+
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
39+
// format, so we'll set the default content format to unknown
40+
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
41+
docMgr.setContentFormat(Format.UNKNOWN);
42+
docMgr.write(
43+
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
44+
);
45+
}
46+
closeAllHandles();
47+
Runnable onSuccess = writeSet.getOnSuccess();
48+
if (onSuccess != null) {
49+
onSuccess.run();
50+
}
51+
} catch (Throwable t) {
52+
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
53+
Consumer<Throwable> onFailure = writeSet.getOnFailure();
54+
if (onFailure != null) {
55+
onFailure.accept(t);
56+
}
57+
}
58+
}
59+
60+
private void closeAllHandles() throws Throwable {
61+
Throwable lastThrowable = null;
62+
for (DocumentWriteOperation doc : writeSet.getWriteSet()) {
63+
try {
64+
if (doc.getContent() instanceof Closeable) {
65+
((Closeable) doc.getContent()).close();
66+
}
67+
if (doc.getMetadata() instanceof Closeable) {
68+
((Closeable) doc.getMetadata()).close();
69+
}
70+
} catch (Throwable t) {
71+
logger.error("error calling close()", t);
72+
lastThrowable = t;
73+
}
74+
}
75+
if (lastThrowable != null) throw lastThrowable;
76+
}
77+
78+
public BatchWriteSet getWriteSet() {
79+
return writeSet;
80+
}
81+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 4 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -281,10 +281,7 @@ private BatchWriteSet newBatchWriteSet() {
281281
private BatchWriteSet newBatchWriteSet(long batchNum) {
282282
int hostToUse = (int) (batchNum % hostInfos.length);
283283
HostInfo host = hostInfos[hostToUse];
284-
DatabaseClient hostClient = host.client;
285-
BatchWriteSet batchWriteSet = new BatchWriteSet(this, hostClient.newDocumentManager().newWriteSet(),
286-
hostClient, getTransform(), getTemporalCollection());
287-
batchWriteSet.setBatchNumber(batchNum);
284+
BatchWriteSet batchWriteSet = new BatchWriteSet(this, host.client, getTransform(), getTemporalCollection(), batchNum);
288285
batchWriteSet.onSuccess( () -> {
289286
sendSuccessToListeners(batchWriteSet);
290287
});
@@ -613,15 +610,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
613610
for ( Runnable task : tasks ) {
614611
if ( task instanceof BatchWriter ) {
615612
BatchWriter writerTask = (BatchWriter) task;
616-
if ( removedHostInfos.containsKey(writerTask.writeSet.getClient().getHost()) ) {
613+
if ( removedHostInfos.containsKey(writerTask.getWriteSet().getClient().getHost()) ) {
617614
// this batch was targeting a host that's no longer on the list
618615
// if we re-add these docs they'll now be in batches that target acceptable hosts
619-
BatchWriteSet writeSet = newBatchWriteSet(writerTask.writeSet.getBatchNumber());
616+
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getWriteSet().getBatchNumber());
620617
writeSet.onFailure(throwable -> {
621618
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
622619
else throw new DataMovementException("Failed to retry batch after failover", throwable);
623620
});
624-
for ( WriteEvent doc : writerTask.writeSet.getBatchOfWriteEvents().getItems() ) {
621+
for ( WriteEvent doc : writerTask.getWriteSet().getBatchOfWriteEvents().getItems() ) {
625622
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
626623
}
627624
BatchWriter retryWriterTask = new BatchWriter(writeSet);
@@ -649,71 +646,6 @@ public static class HostInfo {
649646
public DatabaseClient client;
650647
}
651648

652-
public static class BatchWriter implements Runnable {
653-
private BatchWriteSet writeSet;
654-
655-
public BatchWriter(BatchWriteSet writeSet) {
656-
if ( writeSet.getWriteSet().size() == 0 ) {
657-
throw new IllegalStateException("Attempt to write an empty batch");
658-
}
659-
this.writeSet = writeSet;
660-
}
661-
662-
@Override
663-
public void run() {
664-
try {
665-
Runnable onBeforeWrite = writeSet.getOnBeforeWrite();
666-
if ( onBeforeWrite != null ) {
667-
onBeforeWrite.run();
668-
}
669-
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
670-
if ( writeSet.getTemporalCollection() == null ) {
671-
writeSet.getClient().newDocumentManager().write(
672-
writeSet.getWriteSet(), writeSet.getTransform(), null
673-
);
674-
} else {
675-
// to get access to the TemporalDocumentManager write overload we need to instantiate
676-
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
677-
// format, so we'll set the default content format to unknown
678-
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
679-
docMgr.setContentFormat(Format.UNKNOWN);
680-
docMgr.write(
681-
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
682-
);
683-
}
684-
closeAllHandles();
685-
Runnable onSuccess = writeSet.getOnSuccess();
686-
if ( onSuccess != null ) {
687-
onSuccess.run();
688-
}
689-
} catch (Throwable t) {
690-
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
691-
Consumer<Throwable> onFailure = writeSet.getOnFailure();
692-
if ( onFailure != null ) {
693-
onFailure.accept(t);
694-
}
695-
}
696-
}
697-
698-
private void closeAllHandles() throws Throwable {
699-
Throwable lastThrowable = null;
700-
for ( DocumentWriteOperation doc : writeSet.getWriteSet() ) {
701-
try {
702-
if ( doc.getContent() instanceof Closeable ) {
703-
((Closeable) doc.getContent()).close();
704-
}
705-
if ( doc.getMetadata() instanceof Closeable ) {
706-
((Closeable) doc.getMetadata()).close();
707-
}
708-
} catch (Throwable t) {
709-
logger.error("error calling close()", t);
710-
lastThrowable = t;
711-
}
712-
}
713-
if ( lastThrowable != null ) throw lastThrowable;
714-
}
715-
}
716-
717649
/**
718650
* The following classes and CompletableThreadPoolExecutor
719651
* CompletableRejectedExecutionHandler exist exclusively to enable the

0 commit comments

Comments
 (0)