Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -46,6 +45,18 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {

private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);

private static final int INITIAL_CAPACITY = 100;

protected static class Batch {
final ArrayList<Mutation> toSend;
final ArrayList<CompletableFuture<Void>> toComplete;

Batch(ArrayList<Mutation> toSend, ArrayList<CompletableFuture<Void>> toComplete) {
this.toSend = toSend;
this.toComplete = toComplete;
}
}

private final HashedWheelTimer periodicalFlushTimer;

private final AsyncTable<?> table;
Expand All @@ -58,16 +69,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {

private final int maxMutations;

private List<Mutation> mutations = new ArrayList<>();
private ArrayList<Mutation> mutations = new ArrayList<>(INITIAL_CAPACITY);

private List<CompletableFuture<Void>> futures = new ArrayList<>();
private ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(INITIAL_CAPACITY);

private long bufferedSize;

private boolean closed;
private volatile boolean closed;

// Accessed by tests
Timeout periodicFlushTask;

// Accessed by tests
final ReentrantLock lock = new ReentrantLock();

AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) {
this.periodicalFlushTimer = periodicalFlushTimer;
Expand All @@ -88,23 +103,53 @@ public Configuration getConfiguration() {
return table.getConfiguration();
}

// will be overridden in test
protected void internalFlush() {
/**
* Atomically drains the current buffered mutations and futures under {@link #lock} and prepares
* this mutator to accept a new batch.
* <p>
* The {@link #lock} must be acquired before calling this method. Cancels any pending
* {@link #periodicFlushTask} to avoid a redundant flush for the data we are about to send. Swaps
* the shared {@link #mutations} and {@link #futures} lists into a returned {@link Batch},
* replaces them with fresh lists, and resets {@link #bufferedSize} to zero.
* <p>
* If there is nothing buffered, returns {@code null} so callers can skip sending work.
* <p>
* Protected for being overridden in tests.
* @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty
*/
protected Batch drainBatch() {
ArrayList<Mutation> toSend;
ArrayList<CompletableFuture<Void>> toComplete;
// Cancel the flush task if it is pending.
if (periodicFlushTask != null) {
periodicFlushTask.cancel();
periodicFlushTask = null;
}
List<Mutation> toSend = this.mutations;
toSend = this.mutations;
if (toSend.isEmpty()) {
return;
return null;
}
List<CompletableFuture<Void>> toComplete = this.futures;
toComplete = this.futures;
assert toSend.size() == toComplete.size();
this.mutations = new ArrayList<>();
this.futures = new ArrayList<>();
this.mutations = new ArrayList<>(INITIAL_CAPACITY);
this.futures = new ArrayList<>(INITIAL_CAPACITY);
bufferedSize = 0L;
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) {
return new Batch(toSend, toComplete);
}

/**
* Sends a previously drained {@link Batch} and wires the user-visible completion futures to the
* underlying results returned by {@link AsyncTable#batch(List)}.
* <p>
* Preserves the one-to-one, in-order mapping between mutations and their corresponding futures.
* @param batch the drained batch to send; may be {@code null}
*/
private void sendBatch(Batch batch) {
if (batch == null) {
return;
}
Iterator<CompletableFuture<Void>> toCompleteIter = batch.toComplete.iterator();
for (CompletableFuture<?> future : table.batch(batch.toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
addListener(future, (r, e) -> {
if (e != null) {
Expand All @@ -118,33 +163,43 @@ protected void internalFlush() {

@Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
List<CompletableFuture<Void>> futures =
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
for (int i = 0, n = mutations.size(); i < n; i++) {
futures.add(new CompletableFuture<>());
}
if (closed) {
IOException ioe = new IOException("Already closed");
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
long heapSize = 0;
for (Mutation mutation : mutations) {
heapSize += mutation.heapSize();
if (mutation instanceof Put) {
validatePut((Put) mutation, maxKeyValueSize);
}
}
synchronized (this) {
if (closed) {
IOException ioe = new IOException("Already closed");
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
Batch batch = null;
lock.lock();
try {
if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
synchronized (AsyncBufferedMutatorImpl.this) {
// confirm that we are still valid, if there is already an internalFlush call before us,
// then we should not execute anymore. And in internalFlush we will set periodicFlush
Batch flushBatch = null;
lock.lock();
try {
// confirm that we are still valid, if there is already a drainBatch call before us,
// then we should not execute anymore. And in drainBatch we will set periodicFlush
// to null, and since we may schedule a new one, so here we check whether the references
// are equal.
if (timeout == periodicFlushTask) {
periodicFlushTask = null;
internalFlush();
flushBatch = drainBatch(); // Drains under lock
}
} finally {
lock.unlock();
}
if (flushBatch != null) {
sendBatch(flushBatch); // Sends outside of lock
}
}, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
Expand All @@ -153,24 +208,55 @@ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutation
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
LOG.trace("Flushing because write buffer size {} reached", writeBufferSize);
internalFlush();
// drain now and send after releasing the lock
batch = drainBatch();
} else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
LOG.trace("Flushing because max mutations {} reached", maxMutations);
internalFlush();
batch = drainBatch();
}
} finally {
lock.unlock();
}
// Send outside of lock
if (batch != null) {
sendBatch(batch);
}
return futures;
}

// The only difference bewteen flush and close is that, we will set closed to true before sending
// out the batch to prevent further flush or close
private void flushOrClose(boolean close) {
Batch batch = null;
if (!closed) {
lock.lock();
try {
if (!closed) {
// Drains under lock
batch = drainBatch();
if (close) {
closed = true;
}
}
} finally {
lock.unlock();
}
}
// Send the batch
if (batch != null) {
// Sends outside of lock
sendBatch(batch);
}
}

@Override
public synchronized void flush() {
internalFlush();
public void flush() {
flushOrClose(false);
}

@Override
public synchronized void close() {
internalFlush();
closed = true;
public void close() {
flushOrClose(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void testCancelPeriodicFlushByClose() throws InterruptedException, Execut

private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {

private int flushCount;
private int drainCount;

AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) {
Expand All @@ -267,9 +267,9 @@ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutato
}

@Override
protected void internalFlush() {
flushCount++;
super.internalFlush();
protected Batch drainBatch() {
drainCount++;
return super.drainBatch();
}
}

Expand All @@ -284,16 +284,19 @@ public void testRaceBetweenNormalFlushAndPeriodicFlush()
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
synchronized (mutator) {
// synchronized on mutator to prevent periodic flush to be executed
// get the lock toprevent periodic flush to be executed
mutator.lock.lock();
try {
Thread.sleep(500);
// the timeout should be issued
assertTrue(task.isExpired());
// but no flush is issued as we hold the lock
assertEquals(0, mutator.flushCount);
// but no drain is issued as we hold the lock
assertEquals(0, mutator.drainCount);
assertFalse(future.isDone());
// manually flush, then release the lock
// manually flush and drain, then release the lock
mutator.flush();
} finally {
mutator.lock.unlock();
}
// this is a bit deep into the implementation in netty but anyway let's add a check here to
// confirm that an issued timeout can not be canceled by netty framework.
Expand All @@ -303,7 +306,7 @@ public void testRaceBetweenNormalFlushAndPeriodicFlush()
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
// only the manual flush, the periodic flush should have been canceled by us
assertEquals(1, mutator.flushCount);
assertEquals(1, mutator.drainCount);
}
}
}