Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@

import java.util.function.Consumer;

/**
* Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the
* DocumentWriteSet is empty when this class is constructed.
*/
class BatchWriteSet {

private final WriteBatcher batcher;
private final DocumentWriteSet writeSet;
private final DocumentWriteSet documentWriteSet;
private final long batchNumber;
private final DatabaseClient client;
private final ServerTransform transform;
Expand All @@ -27,15 +31,15 @@ class BatchWriteSet {

BatchWriteSet(WriteBatcher batcher, DatabaseClient hostClient, ServerTransform transform, String temporalCollection, long batchNumber) {
this.batcher = batcher;
this.writeSet = hostClient.newDocumentManager().newWriteSet();
this.documentWriteSet = hostClient.newDocumentManager().newWriteSet();
this.client = hostClient;
this.transform = transform;
this.temporalCollection = temporalCollection;
this.batchNumber = batchNumber;
}

public DocumentWriteSet getWriteSet() {
return writeSet;
public DocumentWriteSet getDocumentWriteSet() {
return documentWriteSet;
}

public long getBatchNumber() {
Expand Down Expand Up @@ -82,7 +86,7 @@ public WriteBatch getBatchOfWriteEvents() {
.withJobWritesSoFar(itemsSoFar)
.withJobTicket(batcher.getJobTicket());

WriteEvent[] writeEvents = getWriteSet().stream()
WriteEvent[] writeEvents = getDocumentWriteSet().stream()
.map(writeOperation ->
new WriteEventImpl()
.withTargetUri(writeOperation.getUri())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,41 @@ class BatchWriter implements Runnable {

private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);

private final BatchWriteSet writeSet;
private final BatchWriteSet batchWriteSet;

BatchWriter(BatchWriteSet writeSet) {
if (writeSet.getWriteSet().size() == 0) {
BatchWriter(BatchWriteSet batchWriteSet) {
if (batchWriteSet.getDocumentWriteSet().size() == 0) {
throw new IllegalStateException("Attempt to write an empty batch");
}
this.writeSet = writeSet;
this.batchWriteSet = batchWriteSet;
}

@Override
public void run() {
try {
logger.trace("begin write batch {} to forest on host \"{}\"", writeSet.getBatchNumber(), writeSet.getClient().getHost());
if (writeSet.getTemporalCollection() == null) {
writeSet.getClient().newDocumentManager().write(
writeSet.getWriteSet(), writeSet.getTransform(), null
logger.trace("begin write batch {} to forest on host \"{}\"", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
if (batchWriteSet.getTemporalCollection() == null) {
batchWriteSet.getClient().newDocumentManager().write(
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null
);
} else {
// to get access to the TemporalDocumentManager write overload we need to instantiate
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
// format, so we'll set the default content format to unknown
XMLDocumentManager docMgr = writeSet.getClient().newXMLDocumentManager();
XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
docMgr.setContentFormat(Format.UNKNOWN);
docMgr.write(
writeSet.getWriteSet(), writeSet.getTransform(), null, writeSet.getTemporalCollection()
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection()
);
}
closeAllHandles();
Runnable onSuccess = writeSet.getOnSuccess();
Runnable onSuccess = batchWriteSet.getOnSuccess();
if (onSuccess != null) {
onSuccess.run();
}
} catch (Throwable t) {
logger.trace("failed batch sent to forest on host \"{}\"", writeSet.getClient().getHost());
Consumer<Throwable> onFailure = writeSet.getOnFailure();
logger.trace("failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
Consumer<Throwable> onFailure = batchWriteSet.getOnFailure();
if (onFailure != null) {
onFailure.accept(t);
}
Expand All @@ -59,7 +59,7 @@ public void run() {

private void closeAllHandles() throws Throwable {
Throwable lastThrowable = null;
for (DocumentWriteOperation doc : writeSet.getWriteSet()) {
for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) {
try {
if (doc.getContent() instanceof Closeable) {
((Closeable) doc.getContent()).close();
Expand All @@ -75,7 +75,7 @@ private void closeAllHandles() throws Throwable {
if (lastThrowable != null) throw lastThrowable;
}

public BatchWriteSet getWriteSet() {
return writeSet;
public BatchWriteSet getBatchWriteSet() {
return batchWriteSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) {
BatchWriteSet writeSet = newBatchWriteSet();
int minBatchSize = 0;
if(defaultMetadata != null) {
writeSet.getWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null));
writeSet.getDocumentWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null));
minBatchSize = 1;
}
for (int i=0; i < getBatchSize(); i++ ) {
Expand All @@ -213,9 +213,9 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) {
// strange, there should have been a full batch of docs in the queue...
break;
}
writeSet.getWriteSet().add(doc);
writeSet.getDocumentWriteSet().add(doc);
}
if ( writeSet.getWriteSet().size() > minBatchSize ) {
if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) {
threadPool.submit( new BatchWriter(writeSet) );
}
}
Expand Down Expand Up @@ -326,7 +326,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) {
});
}
for (WriteEvent doc : batch.getItems()) {
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter runnable = new BatchWriter(writeSet);
runnable.run();
Expand Down Expand Up @@ -392,12 +392,12 @@ private void flush(boolean waitForCompletion) {
}
BatchWriteSet writeSet = newBatchWriteSet();
if(defaultMetadata != null) {
writeSet.getWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null));
writeSet.getDocumentWriteSet().add(new DocumentWriteOperationImpl(OperationType.METADATA_DEFAULT, null, defaultMetadata, null));
}
int j=0;
for ( ; j < getBatchSize() && iter.hasNext(); j++ ) {
DocumentWriteOperation doc = iter.next();
writeSet.getWriteSet().add(doc);
writeSet.getDocumentWriteSet().add(doc);
}
threadPool.submit( new BatchWriter(writeSet) );
}
Expand All @@ -406,7 +406,7 @@ private void flush(boolean waitForCompletion) {
}

private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
batchWriteSet.setItemsSoFar(itemsSoFar.addAndGet(batchWriteSet.getWriteSet().size()));
batchWriteSet.setItemsSoFar(itemsSoFar.addAndGet(batchWriteSet.getDocumentWriteSet().size()));
WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
for ( WriteBatchListener successListener : successListeners ) {
try {
Expand Down Expand Up @@ -606,16 +606,16 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
for ( Runnable task : tasks ) {
if ( task instanceof BatchWriter ) {
BatchWriter writerTask = (BatchWriter) task;
if ( removedHostInfos.containsKey(writerTask.getWriteSet().getClient().getHost()) ) {
if ( removedHostInfos.containsKey(writerTask.getBatchWriteSet().getClient().getHost()) ) {
// this batch was targeting a host that's no longer on the list
// if we re-add these docs they'll now be in batches that target acceptable hosts
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getWriteSet().getBatchNumber());
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getBatchWriteSet().getBatchNumber());
writeSet.onFailure(throwable -> {
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
else throw new DataMovementException("Failed to retry batch after failover", throwable);
});
for ( WriteEvent doc : writerTask.getWriteSet().getBatchOfWriteEvents().getItems() ) {
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
for ( WriteEvent doc : writerTask.getBatchWriteSet().getBatchOfWriteEvents().getItems() ) {
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter retryWriterTask = new BatchWriter(writeSet);
Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask);
Expand Down