Skip to content

Commit a035b8e

Browse files
committed
MLE-25782 Some refactoring of BatchWriter
Wanted to move this into a separate class so it's easier / cleaner to prototype an incremental check listener. Also made BatchWriteSet nicer by making a bunch of fields final. No change in functionality, just cleaning up code.
1 parent 57ad84e commit a035b8e

File tree

3 files changed

+171
-188
lines changed

3 files changed

+171
-188
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java

Lines changed: 86 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -3,121 +3,95 @@
33
*/
44
package com.marklogic.client.datamovement.impl;
55

6-
import java.util.function.Consumer;
7-
86
import com.marklogic.client.DatabaseClient;
9-
import com.marklogic.client.document.DocumentWriteSet;
10-
import com.marklogic.client.document.ServerTransform;
117
import com.marklogic.client.datamovement.WriteBatch;
128
import com.marklogic.client.datamovement.WriteBatcher;
139
import com.marklogic.client.datamovement.WriteEvent;
10+
import com.marklogic.client.document.DocumentWriteSet;
11+
import com.marklogic.client.document.ServerTransform;
12+
13+
import java.util.function.Consumer;
1414

15-
public class BatchWriteSet {
16-
private WriteBatcher batcher;
17-
private DocumentWriteSet writeSet;
18-
private long batchNumber;
19-
private long itemsSoFar;
20-
private DatabaseClient client;
21-
private ServerTransform transform;
22-
private String temporalCollection;
23-
private Runnable onSuccess;
24-
private Consumer<Throwable> onFailure;
25-
private Runnable onBeforeWrite;
26-
27-
public BatchWriteSet(WriteBatcher batcher, DocumentWriteSet writeSet, DatabaseClient client,
28-
ServerTransform transform, String temporalCollection)
29-
{
30-
this.batcher = batcher;
31-
this.writeSet = writeSet;
32-
this.client = client;
33-
this.transform = transform;
34-
this.temporalCollection = temporalCollection;
35-
}
36-
37-
public DocumentWriteSet getWriteSet() {
38-
return writeSet;
39-
}
40-
41-
public void setWriteSet(DocumentWriteSet writeSet) {
42-
this.writeSet = writeSet;
43-
}
44-
45-
public long getBatchNumber() {
46-
return batchNumber;
47-
}
48-
49-
public void setBatchNumber(long batchNumber) {
50-
this.batchNumber = batchNumber;
51-
}
52-
53-
public void setItemsSoFar(long itemsSoFar) {
54-
this.itemsSoFar = itemsSoFar;
55-
}
56-
57-
public DatabaseClient getClient() {
58-
return client;
59-
}
60-
61-
public void setClient(DatabaseClient client) {
62-
this.client = client;
63-
}
64-
65-
public ServerTransform getTransform() {
66-
return transform;
67-
}
68-
69-
public void setTransform(ServerTransform transform) {
70-
this.transform = transform;
71-
}
72-
73-
public String getTemporalCollection() {
74-
return temporalCollection;
75-
}
76-
77-
public void setTemporalCollection(String temporalCollection) {
78-
this.temporalCollection = temporalCollection;
79-
}
80-
81-
public Runnable getOnSuccess() {
82-
return onSuccess;
83-
}
84-
85-
public void onSuccess(Runnable onSuccess) {
86-
this.onSuccess = onSuccess;
87-
}
88-
89-
public Consumer<Throwable> getOnFailure() {
90-
return onFailure;
91-
}
92-
93-
public void onFailure(Consumer<Throwable> onFailure) {
94-
this.onFailure = onFailure;
95-
}
96-
97-
public Runnable getOnBeforeWrite() {
98-
return onBeforeWrite;
99-
}
100-
101-
public void onBeforeWrite(Runnable onBeforeWrite) {
102-
this.onBeforeWrite = onBeforeWrite;
103-
}
104-
105-
public WriteBatch getBatchOfWriteEvents() {
106-
WriteBatchImpl batch = new WriteBatchImpl()
107-
.withBatcher(batcher)
108-
.withClient(client)
109-
.withJobBatchNumber(batchNumber)
110-
.withJobWritesSoFar(itemsSoFar)
111-
.withJobTicket(batcher.getJobTicket());
112-
WriteEvent[] writeEvents = getWriteSet().stream()
113-
.map(writeOperation ->
114-
new WriteEventImpl()
115-
.withTargetUri(writeOperation.getUri())
116-
.withContent(writeOperation.getContent())
117-
.withMetadata(writeOperation.getMetadata())
118-
)
119-
.toArray(WriteEventImpl[]::new);
120-
batch.withItems(writeEvents);
121-
return batch;
122-
}
15+
class BatchWriteSet {
16+
17+
private final WriteBatcher batcher;
18+
private final DocumentWriteSet writeSet;
19+
private final long batchNumber;
20+
private final DatabaseClient client;
21+
private final ServerTransform transform;
22+
private final String temporalCollection;
23+
24+
private long itemsSoFar;
25+
private Runnable onSuccess;
26+
private Consumer<Throwable> onFailure;
27+
28+
BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) {
29+
this.batcher = batcher;
30+
this.writeSet = hostClient.newDocumentManager().newWriteSet();
31+
this.client = hostClient;
32+
this.transform = transform;
33+
this.temporalCollection = temporalCollection;
34+
this.batchNumber = batchNumber;
35+
}
36+
37+
public DocumentWriteSet getWriteSet() {
38+
return writeSet;
39+
}
40+
41+
public long getBatchNumber() {
42+
return batchNumber;
43+
}
44+
45+
public void setItemsSoFar(long itemsSoFar) {
46+
this.itemsSoFar = itemsSoFar;
47+
}
48+
49+
public DatabaseClient getClient() {
50+
return client;
51+
}
52+
53+
public ServerTransform getTransform() {
54+
return transform;
55+
}
56+
57+
public String getTemporalCollection() {
58+
return temporalCollection;
59+
}
60+
61+
public Runnable getOnSuccess() {
62+
return onSuccess;
63+
}
64+
65+
public void onSuccess(Runnable onSuccess) {
66+
this.onSuccess = onSuccess;
67+
}
68+
69+
public Consumer<Throwable> getOnFailure() {
70+
return onFailure;
71+
}
72+
73+
public void onFailure(Consumer<Throwable> onFailure) {
74+
this.onFailure = onFailure;
75+
}
76+
77+
public WriteBatch getBatchOfWriteEvents() {
78+
WriteBatchImpl batch = new WriteBatchImpl()
79+
.withBatcher(batcher)
80+
.withClient(client)
81+
.withJobBatchNumber(batchNumber)
82+
.withJobWritesSoFar(itemsSoFar)
83+
.withJobTicket(batcher.getJobTicket());
84+
85+
WriteEvent[] writeEvents = getWriteSet().stream()
86+
.map(writeOperation ->
87+
new WriteEventImpl()
88+
.withTargetUri(writeOperation.getUri())
89+
.withContent(writeOperation.getContent())
90+
.withMetadata(writeOperation.getMetadata())
91+
)
92+
.toArray(WriteEventImpl[]::new);
93+
94+
batch.withItems(writeEvents);
95+
return batch;
96+
}
12397
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.impl;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.XMLDocumentManager;
8+
import com.marklogic.client.io.Format;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.Closeable;
13+
import java.util.function.Consumer;
14+
15+
class BatchWriter implements Runnable {
16+
17+
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
18+
19+
private final BatchWriteSet writeSet;
20+
21+
BatchWriter(BatchWriteSet writeSet) {
22+
if (writeSet.getWriteSet().size() == 0) {
23+
throw new IllegalStateException("Attempt to write an empty batch");
24+
}
25+
this.writeSet = writeSet;
26+
}
27+
28+
@Override
29+
public void run() {
30+
try {
31+
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
32+
if (writeSet.getTemporalCollection() == null) {
33+
writeSet.getClient().newDocumentManager().write(
34+
writeSet.getWriteSet(), writeSet.getTransform(), null
35+
);
36+
} else {
37+
// to get access to the TemporalDocumentManager write overload we need to instantiate
38+
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
39+
// format, so we'll set the default content format to unknown
40+
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
41+
docMgr.setContentFormat(Format.UNKNOWN);
42+
docMgr.write(
43+
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
44+
);
45+
}
46+
closeAllHandles();
47+
Runnable onSuccess = writeSet.getOnSuccess();
48+
if (onSuccess != null) {
49+
onSuccess.run();
50+
}
51+
} catch (Throwable t) {
52+
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
53+
Consumer<Throwable> onFailure = writeSet.getOnFailure();
54+
if (onFailure != null) {
55+
onFailure.accept(t);
56+
}
57+
}
58+
}
59+
60+
private void closeAllHandles() throws Throwable {
61+
Throwable lastThrowable = null;
62+
for (DocumentWriteOperation doc : writeSet.getWriteSet()) {
63+
try {
64+
if (doc.getContent() instanceof Closeable) {
65+
((Closeable) doc.getContent()).close();
66+
}
67+
if (doc.getMetadata() instanceof Closeable) {
68+
((Closeable) doc.getMetadata()).close();
69+
}
70+
} catch (Throwable t) {
71+
logger.error("error calling close()", t);
72+
lastThrowable = t;
73+
}
74+
}
75+
if (lastThrowable != null) throw lastThrowable;
76+
}
77+
78+
public BatchWriteSet getWriteSet() {
79+
return writeSet;
80+
}
81+
}

0 commit comments

Comments
 (0)