diff --git a/pom.xml b/pom.xml index aaefbdd..cf3a87c 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.pythian.opentsdb asyncbigtable - 0.4.3 + 0.4.4 jar Async Bigtable library @@ -33,6 +33,25 @@ https://github.com/OpenTSDB/asyncbigtable/issues + + + + io.opentelemetry + opentelemetry-bom + 1.34.1 + pom + import + + + com.google.cloud + libraries-bom + 26.27.0 + pom + import + + + + tsuna @@ -230,6 +249,11 @@ + + + io.opentelemetry + opentelemetry-api + com.stumbleupon async @@ -242,28 +266,42 @@ 1.7.7 - com.google.guava guava - 18.0 + 27.0-jre - com.google.cloud.bigtable - bigtable-hbase-2.x-hadoop - 1.23.0 - - - + com.google.cloud.bigtable + bigtable-hbase-2.x-hadoop + 2.15.5 + + + com.google.cloud.bigtable + bigtable-client-core + + + - org.slf4j - log4j-over-slf4j - 1.7.7 - runtime + org.apache.zookeeper + zookeeper + 3.4.14 + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + - + + + commons-lang commons-lang diff --git a/src/main/java/org/hbase/async/AppendRequest.java b/src/main/java/org/hbase/async/AppendRequest.java index 7efbd90..0a19c74 100644 --- a/src/main/java/org/hbase/async/AppendRequest.java +++ b/src/main/java/org/hbase/async/AppendRequest.java @@ -174,6 +174,7 @@ public AppendRequest(final byte[] table, * @param table The table to edit. * @param kv The {@link KeyValue} to store. */ + @SuppressWarnings("deprecation") public AppendRequest(final byte[] table, final KeyValue kv) { super(table, kv.key(), kv.family(), kv.timestamp(), RowLock.NO_LOCK); @@ -182,6 +183,7 @@ public AppendRequest(final byte[] table, } /** Private constructor. */ + @SuppressWarnings("deprecation") private AppendRequest(final byte[] table, final byte[] key, final byte[] family, @@ -271,4 +273,4 @@ public boolean returnResult() { return return_result; } -} \ No newline at end of file +} diff --git a/src/main/java/org/hbase/async/CompareFilter.java b/src/main/java/org/hbase/async/CompareFilter.java index 9684f4d..313517f 100644 --- a/src/main/java/org/hbase/async/CompareFilter.java +++ b/src/main/java/org/hbase/async/CompareFilter.java @@ -31,6 +31,7 @@ * operator (equal, greater, not equal, etc) and a filter comparator. * @since 1.6 */ +@SuppressWarnings("deprecation") public abstract class CompareFilter extends ScanFilter { /** Comparison operators. */ diff --git a/src/main/java/org/hbase/async/Counter.java b/src/main/java/org/hbase/async/Counter.java index d29875c..2d24a61 100644 --- a/src/main/java/org/hbase/async/Counter.java +++ b/src/main/java/org/hbase/async/Counter.java @@ -26,7 +26,7 @@ */ package org.hbase.async; -import org.hbase.async.jsr166e.LongAdder; +import java.util.concurrent.atomic.LongAdder; /** * An atomic counter to replace {@link java.util.concurrent.atomic.AtomicLong}. diff --git a/src/main/java/org/hbase/async/DeleteRequest.java b/src/main/java/org/hbase/async/DeleteRequest.java index 2a9567b..8d6cff4 100644 --- a/src/main/java/org/hbase/async/DeleteRequest.java +++ b/src/main/java/org/hbase/async/DeleteRequest.java @@ -66,6 +66,7 @@ public final class DeleteRequest extends BatchableRpc * @param key The key of the row to edit in that table. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key) { this(table, key, null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -78,6 +79,7 @@ public DeleteRequest(final byte[] table, final byte[] key) { * @param timestamp The timestamp to set on this edit. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final long timestamp) { this(table, key, null, null, timestamp, RowLock.NO_LOCK); @@ -91,6 +93,7 @@ public DeleteRequest(final byte[] table, final byte[] key, * @param family The column family to edit in that table. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family) { @@ -106,6 +109,7 @@ public DeleteRequest(final byte[] table, * @param timestamp The timestamp to set on this edit. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -123,6 +127,7 @@ public DeleteRequest(final byte[] table, * Can be {@code null} since version 1.1. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -143,6 +148,7 @@ public DeleteRequest(final byte[] table, * @param timestamp The timestamp to set on this edit. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -162,6 +168,7 @@ public DeleteRequest(final byte[] table, * @param qualifiers The column qualifiers to delete in that family. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -180,6 +187,7 @@ public DeleteRequest(final byte[] table, * @param timestamp The timestamp to set on this edit. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -199,6 +207,7 @@ public DeleteRequest(final byte[] table, * @deprecated * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -221,6 +230,7 @@ public DeleteRequest(final byte[] table, * @deprecated * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -244,6 +254,7 @@ public DeleteRequest(final byte[] table, * @deprecated * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -265,6 +276,7 @@ public DeleteRequest(final byte[] table, * @deprecated * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, @@ -280,6 +292,7 @@ public DeleteRequest(final byte[] table, * @param key The key of the row to edit in that table. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final String table, final String key) { this(table.getBytes(), key.getBytes(), null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); @@ -292,6 +305,7 @@ public DeleteRequest(final String table, final String key) { * @param family The column family to edit in that table. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final String table, final String key, final String family) { @@ -308,6 +322,7 @@ public DeleteRequest(final String table, * Can be {@code null} since version 1.1. * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final String table, final String key, final String family, @@ -328,6 +343,7 @@ public DeleteRequest(final String table, * @deprecated * @throws IllegalArgumentException if any argument is malformed. */ + @SuppressWarnings("deprecation") public DeleteRequest(final String table, final String key, final String family, @@ -345,6 +361,7 @@ public DeleteRequest(final String table, * {@link KeyValue} specifies a timestamp, then this specific timestamp only * will be deleted. */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final KeyValue kv) { this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() }, kv.timestamp(), RowLock.NO_LOCK); @@ -359,6 +376,7 @@ public DeleteRequest(final byte[] table, final KeyValue kv) { * @param lock Ignored * @deprecated */ + @SuppressWarnings("deprecation") public DeleteRequest(final byte[] table, final KeyValue kv, final RowLock lock) { diff --git a/src/main/java/org/hbase/async/HBaseClient.java b/src/main/java/org/hbase/async/HBaseClient.java index fd25d97..24c6f72 100644 --- a/src/main/java/org/hbase/async/HBaseClient.java +++ b/src/main/java/org/hbase/async/HBaseClient.java @@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory; import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.stumbleupon.async.Callback; @@ -99,8 +100,7 @@ public final class HBaseClient { * TODO(tsuna): Make the tick duration configurable? */ private final HashedWheelTimer - timer = new HashedWheelTimer(Threads.newDaemonThreadFactory("Flush-Timer"), 20, MILLISECONDS); - + timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HBaseClient-Timer-%d").build(),20, MILLISECONDS); /** Up to how many milliseconds can we buffer an edit on the client side. */ private volatile short flush_interval = 1000; // ms @@ -287,7 +287,21 @@ public HBaseClient(final Config config, final ExecutorService executor) { public HBaseClient(final Configuration configuration, final ExecutorService executor) { this.executor = executor; num_connections_created.increment(); - hbase_config = BigtableConfiguration.asyncConfigure(configuration); + + // --- FIX: Extract project/instance IDs and call the correct 'configure' method --- + final String projectId = configuration.get("google.bigtable.project.id"); + final String instanceId = configuration.get("google.bigtable.instance.id"); + + if (projectId == null || projectId.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: google.bigtable.project.id"); + } + if (instanceId == null || instanceId.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: google.bigtable.instance.id"); + } + + hbase_config = BigtableConfiguration.configure(configuration, projectId, instanceId); + // --- END FIX --- + LOG.info("BigTable API: Connecting with config: {}", hbase_config); try { @@ -609,6 +623,7 @@ public Deferred ensureTableFamilyExists(final String table, * @throws TableNotFoundException (deferred) if the table doesn't exist. * @throws NoSuchColumnFamilyException (deferred) if the family doesn't exist. */ + @SuppressWarnings("unchecked") public Deferred ensureTableFamilyExists(final byte[] table, final byte[] family) { if (LOG.isDebugEnabled()) { LOG.debug("BigTable API: Checking if table [{}] and family [{}] exist", Bytes.pretty(table), @@ -1177,6 +1192,7 @@ public Deferred put(final PutRequest request) { * (think of it as {@code Deferred}). But you probably want to attach * at least an errback to this {@code Deferred} to handle failures. */ + @SuppressWarnings("deprecation") public Deferred append(final AppendRequest request) { num_appends.increment(); @@ -1217,6 +1233,7 @@ public Deferred append(final AppendRequest request) { * the CAS failed because the value in BigTable didn't match the expected value * of the CAS request. */ + @SuppressWarnings("deprecation") public Deferred compareAndSet(final PutRequest edit, final byte[] expected) { @@ -1271,6 +1288,7 @@ public Deferred compareAndSet(final PutRequest edit, * inserted in BigTable, {@code false} if there was already a value in the * given cell. */ + @SuppressWarnings("deprecation") public Deferred atomicCreate(final PutRequest edit) { return compareAndSet(edit, EMPTY_ARRAY); } diff --git a/src/main/java/org/hbase/async/KeyRegexpFilter.java b/src/main/java/org/hbase/async/KeyRegexpFilter.java index 68847c2..4f7757c 100644 --- a/src/main/java/org/hbase/async/KeyRegexpFilter.java +++ b/src/main/java/org/hbase/async/KeyRegexpFilter.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; @@ -144,33 +145,76 @@ public byte[] getRegexp() { public Charset getCharset() { return Charset.forName(new String(charset)); } - + + /** + * Checks if a byte corresponds to a Regex Metacharacter that requires escaping + * when found inside a Literal (\Q...\E) block. + */ + private boolean isMetaCharacter(int b) { + // Metacharacters: \ * + ? | { [ ( ) ^ $ . # + return b == 92 || b == 42 || b == 43 || b == 63 || b == 124 || + b == 123 || b == 91 || b == 40 || b == 41 || b == 94 || + b == 36 || b == 46 || b == 35; + } + @Override Filter getBigtableFilter() { - RegexStringComparator comparator = new RegexStringComparator(regexp_string); - comparator.setCharset(charset_object); - try { - // WARNING: This is some ugly ass code. It WILL break at some point. - // Bigtable uses RE2 and runs in raw byte mode. TSDB writes regex with - // byte values but when passing it through the HTable APIs it's converted - // to UTF and serializes differently than the old AsyncHBase client. The - // native BigTable client will pass the regex along properly BUT we need - // to bypass the RegexStringComparator methods and inject our ASCII regex - // directly into the underlying comparator object. Hopefully this is - // temporary (famous last words) until we get to a native Bigtable wrapper. - final Field field = ByteArrayComparable.class.getDeclaredField("value"); - field.setAccessible(true); - field.set(comparator, regexp_string.getBytes(HBaseClient.ASCII)); - field.setAccessible(false); - return new RowFilter(CompareFilter.CompareOp.EQUAL, comparator); - } catch (NoSuchFieldException e) { - throw new RuntimeException("ByteArrayComparator must have changed, " - + "can't find the field", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Access denied when hacking the " - + "regex comparator field", e); + // SANITIZER V4: "Strip, Escape Meta, and ASCII-Safe" + // 1. Strip \Q and \E. + // 2. Inside quotes: Escape metacharacters (e.g. '.') with backslash. + // 3. Inside quotes: Hex-escape non-printables. + // 4. Outside quotes: Pass structure through. + + StringBuilder sanitizer = new StringBuilder(this.regexp.length * 2); + boolean inQuote = false; + + for (int i = 0; i < this.regexp.length; i++) { + byte b = this.regexp[i]; + int unsigned = b & 0xFF; + + // CHECK FOR \Q (Start Quote) -> byte 92 followed by 81 + if (unsigned == 92 && i + 1 < this.regexp.length && (this.regexp[i+1] & 0xFF) == 81) { + inQuote = true; + i++; // Skip the 'Q' + continue; // Skip the '\' + } + + // CHECK FOR \E (End Quote) -> byte 92 followed by 69 + if (unsigned == 92 && i + 1 < this.regexp.length && (this.regexp[i+1] & 0xFF) == 69) { + inQuote = false; + i++; // Skip the 'E' + continue; // Skip the '\' + } + + if (inQuote) { + // Inside "Literal" block. + // CRITICAL FIX: If we Hex-Escape a metacharacter (like '.' -> \x2e), + // RE2 treats it as the wildcard '.'. We must escape it with '\'. + if (isMetaCharacter(unsigned)) { + sanitizer.append('\\').append((char) unsigned); + } else { + // For non-meta chars, use Hex Escape to be safe against UTF-8 clients + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } else { + // Structural Regex (e.g. ^, ., *, {8}) + // Pass standard printable ASCII through as-is to preserve regex logic. + if (unsigned >= 32 && unsigned < 127) { + sanitizer.append((char) unsigned); + } else { + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } } + + RegexStringComparator comparator = new RegexStringComparator(sanitizer.toString()); + + // CRITICAL FIX 2: Use ISO-8859-1. + // This tells Bigtable/HBase to treat the Row Key bytes as raw 1:1 characters + // rather than trying to decode them as UTF-8 (which fails on binary IDs). + comparator.setCharset(CharsetUtil.ISO_8859_1); + + return new RowFilter(CompareOperator.EQUAL, comparator); } } - diff --git a/src/main/java/org/hbase/async/PutRequest.java b/src/main/java/org/hbase/async/PutRequest.java index fbcf072..34446e6 100644 --- a/src/main/java/org/hbase/async/PutRequest.java +++ b/src/main/java/org/hbase/async/PutRequest.java @@ -95,6 +95,7 @@ public final class PutRequest extends BatchableRpc * @param qualifier The column qualifier to edit in that family. * @param value The value to store. */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final byte[] key, final byte[] family, @@ -121,6 +122,7 @@ public PutRequest(final byte[] table, * @throws IllegalArgumentException if {@code qualifiers.length == 0} * or if {@code qualifiers.length != values.length} */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final byte[] key, final byte[] family, @@ -140,6 +142,7 @@ public PutRequest(final byte[] table, * @param value The value to store. * @param timestamp The timestamp to set on this edit. */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final byte[] key, final byte[] family, @@ -161,7 +164,8 @@ public PutRequest(final byte[] table, * @throws IllegalArgumentException if {@code qualifiers.length == 0} * or if {@code qualifiers.length != values.length} */ - public PutRequest(final byte[] table, + @SuppressWarnings("deprecation") + public PutRequest(final byte[] table, final byte[] key, final byte[] family, final byte[][] qualifiers, @@ -187,6 +191,7 @@ public PutRequest(final byte[] table, * @param lock Ignored * @deprecated */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final byte[] key, final byte[] family, @@ -233,6 +238,7 @@ public PutRequest(final byte[] table, * @throws IllegalArgumentException if {@code qualifiers.length == 0} * or if {@code qualifiers.length != values.length} */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final byte[] key, final byte[] family, @@ -257,6 +263,7 @@ public PutRequest(final byte[] table, * @param qualifier The column qualifier to edit in that family. * @param value The value to store. */ + @SuppressWarnings("deprecation") public PutRequest(final String table, final String key, final String family, @@ -283,6 +290,7 @@ public PutRequest(final String table, * @param lock Ignored * @deprecated */ + @SuppressWarnings("deprecation") public PutRequest(final String table, final String key, final String family, @@ -299,6 +307,7 @@ public PutRequest(final String table, * @param table The table to edit. * @param kv The {@link KeyValue} to store. */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final KeyValue kv) { this(table, kv, RowLock.NO_LOCK); @@ -311,6 +320,7 @@ public PutRequest(final byte[] table, * @param lock Ignored * @deprecated */ + @SuppressWarnings("deprecation") public PutRequest(final byte[] table, final KeyValue kv, final RowLock lock) { diff --git a/src/main/java/org/hbase/async/RegexStringComparator.java b/src/main/java/org/hbase/async/RegexStringComparator.java index b80368e..e7b85bc 100644 --- a/src/main/java/org/hbase/async/RegexStringComparator.java +++ b/src/main/java/org/hbase/async/RegexStringComparator.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; -import com.google.common.base.Charsets; +import java.nio.charset.StandardCharsets; /** * A regular expression comparator used in comparison filters such as RowFilter, @@ -68,7 +68,7 @@ public final class RegexStringComparator extends FilterComparator { * @param expr The regular expression with which to filter. */ public RegexStringComparator(String expr) { - this(expr, Charsets.UTF_8); + this(expr, StandardCharsets.UTF_8); } /** diff --git a/src/main/java/org/hbase/async/Scanner.java b/src/main/java/org/hbase/async/Scanner.java index 479593e..1c8883f 100644 --- a/src/main/java/org/hbase/async/Scanner.java +++ b/src/main/java/org/hbase/async/Scanner.java @@ -33,9 +33,9 @@ import java.util.ArrayList; import java.util.NavigableMap; +import com.google.cloud.bigtable.hbase.BigtableExtendedScan; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,9 +81,6 @@ public final class Scanner { private static final Logger LOG = LoggerFactory.getLogger(Scanner.class); - /** HBase API scan instance */ - private final Scan hbase_scan; - /** * HBase API ResultScanner. After the scan is submitted is must not be null. */ @@ -187,8 +184,6 @@ public Scanner(final HBaseClient client, final byte[] table) { KeyValue.checkTable(table); this.client = client; this.table = table; - - hbase_scan = new Scan(); } /** @@ -210,8 +205,6 @@ public void setStartKey(final byte[] start_key) { KeyValue.checkKey(start_key); checkScanningNotStarted(); this.start_key = start_key; - - hbase_scan.setStartRow(start_key); } /** @@ -235,8 +228,6 @@ public void setStopKey(final byte[] stop_key) { KeyValue.checkKey(stop_key); checkScanningNotStarted(); this.stop_key = stop_key; - - hbase_scan.setStopRow(stop_key); } /** @@ -258,8 +249,6 @@ public void setFamily(final byte[] family) { KeyValue.checkFamily(family); checkScanningNotStarted(); families = new byte[][] { family }; - - hbase_scan.addFamily(family); } /** Specifies a particular column family to scan. */ @@ -292,14 +281,6 @@ public void setFamilies(byte[][] families, byte[][][] qualifiers) { this.families = families; this.qualifiers = qualifiers; - for (int i = 0; i < families.length; i++) { - KeyValue.checkFamily(families[i]); - if (qualifiers != null && qualifiers[i] != null) { - for (byte[] qualifier : qualifiers[i]) { - hbase_scan.addColumn(families[i], qualifier); - } - } - } } /** @@ -313,7 +294,6 @@ public void setFamilies(final String... families) { KeyValue.checkFamily(this.families[i]); qualifiers[i] = null; - hbase_scan.addFamily(this.families[i]); } } @@ -331,8 +311,6 @@ public void setQualifier(final byte[] qualifier) { checkScanningNotStarted(); this.qualifiers = new byte[][][] { { qualifier } }; - if (families.length > 0) - hbase_scan.addColumn(families[0], qualifier); } /** Specifies a particular column qualifier to scan. */ @@ -379,7 +357,6 @@ public ScanFilter getFilter() { */ public void clearFilter() { filter = null; - hbase_scan.setFilter(null); } /** @@ -427,7 +404,6 @@ public void setKeyRegexp(final String regexp, final Charset charset) { public void setServerBlockCache(final boolean populate_blockcache) { checkScanningNotStarted(); this.populate_blockcache = populate_blockcache; - hbase_scan.setCacheBlocks(populate_blockcache); } /** @@ -493,7 +469,6 @@ public void setMaxNumKeyValues(final int max_num_kvs) { } checkScanningNotStarted(); this.max_num_kvs = max_num_kvs; - hbase_scan.setBatch(max_num_kvs); } /** @@ -522,7 +497,6 @@ public void setMaxVersions(final int versions) { checkScanningNotStarted(); this.versions = versions; - hbase_scan.setMaxVersions(versions); } /** @@ -553,7 +527,6 @@ public void setMaxNumBytes(final long max_num_bytes) { checkScanningNotStarted(); this.max_num_bytes = max_num_bytes; - hbase_scan.setMaxResultSize(max_num_bytes); } /** @@ -586,11 +559,6 @@ public void setMinTimestamp(final long timestamp) { checkScanningNotStarted(); min_timestamp = timestamp; - try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid timestamp: " + timestamp, e); - } } /** @@ -623,11 +591,6 @@ public void setMaxTimestamp(final long timestamp) { checkScanningNotStarted(); max_timestamp = timestamp; - try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid timestamp: " + timestamp, e); - } } /** @@ -665,36 +628,50 @@ public void setTimeRange(final long min_timestamp, final long max_timestamp) { this.min_timestamp = min_timestamp; this.max_timestamp = max_timestamp; + } + + + BigtableExtendedScan getHbaseScan() { + // Instantiate a NEW object to guarantee a clean state + BigtableExtendedScan scan = new BigtableExtendedScan(); + + // Apply Ranges (BigtableExtendedScan treats empty byte[] as unbounded) + scan.addRange(this.start_key, this.stop_key); + + // Apply Configuration from local Scanner fields try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); + scan.setTimeRange(this.min_timestamp, this.max_timestamp); } catch (IOException e) { - throw new IllegalArgumentException("Invalid time range", e); + // This technically shouldn't happen as setters validate, + // but we must handle the checked exception. + throw new RuntimeException("Invalid time range configuration", e); } - } - - /** @return the HTable scan object */ - Scan getHbaseScan() { - // setup the filters - if (filter == null) { - return hbase_scan; + + scan.readVersions(this.versions); + scan.setBatch(this.max_num_kvs); + scan.setMaxResultSize(this.max_num_bytes); + scan.setCacheBlocks(this.populate_blockcache); + + // Apply Families and Qualifiers + if (this.families != null) { + for (int i = 0; i < this.families.length; i++) { + byte[] family = this.families[i]; + if (this.qualifiers != null && i < this.qualifiers.length && this.qualifiers[i] != null) { + for (byte[] qualifier : this.qualifiers[i]) { + scan.addColumn(family, qualifier); + } + } else { + scan.addFamily(family); + } + } } - - // TODO - right now we ONLY push the regex filter to Bigtable. The fuzzy - // filter isn't setup properly yet and we're not using column filters at this - // time. -// if (filter instanceof FilterList) { -// for (final ScanFilter sf : ((FilterList)filter).getFilters()) { -// if (sf instanceof KeyRegexpFilter) { -// hbase_scan.setFilter(((KeyRegexpFilter)sf).getRegexFilterForBigtable()); -// return hbase_scan; -// } -// } -// } else if (filter instanceof KeyRegexpFilter) { -// hbase_scan.setFilter(((KeyRegexpFilter)filter).getRegexFilterForBigtable()); -// return hbase_scan; -// } - hbase_scan.setFilter(filter.getBigtableFilter()); - return hbase_scan; + + // Apply Filter - This invokes your new KeyRegexpFilter logic + if (this.filter != null) { + scan.setFilter(this.filter.getBigtableFilter()); + } + + return scan; } /** @return the scanner results to work with */ diff --git a/src/main/java/org/hbase/async/jsr166e/LongAdder.java b/src/main/java/org/hbase/async/jsr166e/LongAdder.java deleted file mode 100644 index 6ce53b6..0000000 --- a/src/main/java/org/hbase/async/jsr166e/LongAdder.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package org.hbase.async.jsr166e; -import java.util.concurrent.atomic.AtomicLong; -import java.io.IOException; -import java.io.Serializable; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -/** - * One or more variables that together maintain an initially zero - * {@code long} sum. When updates (method {@link #add}) are contended - * across threads, the set of variables may grow dynamically to reduce - * contention. Method {@link #sum} (or, equivalently, {@link - * #longValue}) returns the current total combined across the - * variables maintaining the sum. - * - *

This class is usually preferable to {@link AtomicLong} when - * multiple threads update a common sum that is used for purposes such - * as collecting statistics, not for fine-grained synchronization - * control. Under low update contention, the two classes have similar - * characteristics. But under high contention, expected throughput of - * this class is significantly higher, at the expense of higher space - * consumption. - * - *

This class extends {@link Number}, but does not define - * methods such as {@code hashCode} and {@code compareTo} because - * instances are expected to be mutated, and so are not useful as - * collection keys. - * - *

jsr166e note: This class is targeted to be placed in - * java.util.concurrent.atomic - * - * @since 1.8 - * @author Doug Lea - */ -public class LongAdder extends Striped64 implements Serializable { - private static final long serialVersionUID = 7249069246863182397L; - - /** - * Version of plus for use in retryUpdate - */ - final long fn(long v, long x) { return v + x; } - - /** - * Creates a new adder with initial sum of zero. - */ - public LongAdder() { - } - - /** - * Adds the given value. - * - * @param x the value to add - */ - public void add(long x) { - Cell[] as; long b, v; HashCode hc; Cell a; int n; - if ((as = cells) != null || !casBase(b = base, b + x)) { - boolean uncontended = true; - int h = (hc = threadHashCode.get()).code; - if (as == null || (n = as.length) < 1 || - (a = as[(n - 1) & h]) == null || - !(uncontended = a.cas(v = a.value, v + x))) - retryUpdate(x, hc, uncontended); - } - } - - /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Returns the current sum. The returned value is NOT an - * atomic snapshot: Invocation in the absence of concurrent - * updates returns an accurate result, but concurrent updates that - * occur while the sum is being calculated might not be - * incorporated. - * - * @return the sum - */ - public long sum() { - long sum = base; - Cell[] as = cells; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - sum += a.value; - } - } - return sum; - } - - /** - * Resets variables maintaining the sum to zero. This method may - * be a useful alternative to creating a new adder, but is only - * effective if there are no concurrent updates. Because this - * method is intrinsically racy, it should only be used when it is - * known that no threads are concurrently updating. - */ - public void reset() { - internalReset(0L); - } - - /** - * Equivalent in effect to {@link #sum} followed by {@link - * #reset}. This method may apply for example during quiescent - * points between multithreaded computations. If there are - * updates concurrent with this method, the returned value is - * not guaranteed to be the final value occurring before - * the reset. - * - * @return the sum - */ - public long sumThenReset() { - long sum = base; - Cell[] as = cells; - base = 0L; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) { - sum += a.value; - a.value = 0L; - } - } - } - return sum; - } - - /** - * Returns the String representation of the {@link #sum}. - * @return the String representation of the {@link #sum} - */ - public String toString() { - return Long.toString(sum()); - } - - /** - * Equivalent to {@link #sum}. - * - * @return the sum - */ - public long longValue() { - return sum(); - } - - /** - * Returns the {@link #sum} as an {@code int} after a narrowing - * primitive conversion. - */ - public int intValue() { - return (int)sum(); - } - - /** - * Returns the {@link #sum} as a {@code float} - * after a widening primitive conversion. - */ - public float floatValue() { - return (float)sum(); - } - - /** - * Returns the {@link #sum} as a {@code double} after a widening - * primitive conversion. - */ - public double doubleValue() { - return (double)sum(); - } - - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - s.writeLong(sum()); - } - - private void readObject(ObjectInputStream s) - throws IOException, ClassNotFoundException { - s.defaultReadObject(); - busy = 0; - cells = null; - base = s.readLong(); - } - -} diff --git a/src/main/java/org/hbase/async/jsr166e/README b/src/main/java/org/hbase/async/jsr166e/README deleted file mode 100644 index c1449fb..0000000 --- a/src/main/java/org/hbase/async/jsr166e/README +++ /dev/null @@ -1,14 +0,0 @@ -The contents of this directory contains code from JSR 166e. -** THIS IS NOT PART OF THE PUBLIC INTERFACE OF ASYNCHBASE ** - -Code was downloaded from: - http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ - -The code was released to the public domain, as explained at - http://creativecommons.org/publicdomain/zero/1.0/ - -The code is bundled in asynchbase as it is not available until JDK8 -becomes a reality, which is expected to take years (at time of writing). -Ideally another library, such as Google Guava, would provide this -code until JDK8 becomes the norm. But in the mean time it's here -for asynchbase's internal use. diff --git a/src/main/java/org/hbase/async/jsr166e/Striped64.java b/src/main/java/org/hbase/async/jsr166e/Striped64.java deleted file mode 100644 index e5a6dee..0000000 --- a/src/main/java/org/hbase/async/jsr166e/Striped64.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package org.hbase.async.jsr166e; -import java.util.Random; - -/** - * A package-local class holding common representation and mechanics - * for classes supporting dynamic striping on 64bit values. The class - * extends Number so that concrete subclasses must publicly do so. - */ -abstract class Striped64 extends Number { - /* - * This class maintains a lazily-initialized table of atomically - * updated variables, plus an extra "base" field. The table size - * is a power of two. Indexing uses masked per-thread hash codes. - * Nearly all declarations in this class are package-private, - * accessed directly by subclasses. - * - * Table entries are of class Cell; a variant of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines (with a huge negative performance impact) without - * this precaution. - * - * In part because Cells are relatively large, we avoid creating - * them until they are needed. When there is no contention, all - * updates are made to the base field. Upon first contention (a - * failed CAS on base update), the table is initialized to size 2. - * The table size is doubled upon further contention until - * reaching the nearest power of two greater than or equal to the - * number of CPUS. Table slots remain empty (null) until they are - * needed. - * - * A single spinlock ("busy") is used for initializing and - * resizing the table, as well as populating slots with new Cells. - * There is no need for a blocking lock: When the lock is not - * available, threads try other slots (or the base). During these - * retries, there is increased contention and reduced locality, - * which is still better than alternatives. - * - * Per-thread hash codes are initialized to random values. - * Contention and/or table collisions are indicated by failed - * CASes when performing an update operation (see method - * retryUpdate). Upon a collision, if the table size is less than - * the capacity, it is doubled in size unless some other thread - * holds the lock. If a hashed slot is empty, and lock is - * available, a new Cell is created. Otherwise, if the slot - * exists, a CAS is tried. Retries proceed by "double hashing", - * using a secondary hash (Marsaglia XorShift) to try to find a - * free slot. - * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and collisions - * only become known via CAS failures, convergence can be slow, - * and because threads are typically not bound to CPUS forever, - * may not occur at all. However, despite these limitations, - * observed contention rates are typically low in these cases. - * - * It is possible for a Cell to become unused when threads that - * once hashed to it terminate, as well as in the case where - * doubling the table causes no thread to hash to it under - * expanded mask. We do not try to detect or remove such cells, - * under the assumption that for long-running instances, observed - * contention levels will recur, so the cells will eventually be - * needed again; and for short-lived ones, it does not matter. - */ - - /** - * Padded variant of AtomicLong supporting only raw accesses plus CAS. - * The value field is placed between pads, hoping that the JVM doesn't - * reorder them. - * - * JVM intrinsics note: It would be possible to use a release-only - * form of CAS here, if it were provided. - */ - static final class Cell { - volatile long p0, p1, p2, p3, p4, p5, p6; - volatile long value; - volatile long q0, q1, q2, q3, q4, q5, q6; - Cell(long x) { value = x; } - - final boolean cas(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long valueOffset; - static { - try { - UNSAFE = getUnsafe(); - Class ak = Cell.class; - valueOffset = UNSAFE.objectFieldOffset - (ak.getDeclaredField("value")); - } catch (Exception e) { - throw new Error(e); - } - } - - } - - /** - * Holder for the thread-local hash code. The code is initially - * random, but may be set to a different value upon collisions. - */ - static final class HashCode { - static final Random rng = new Random(); - int code; - HashCode() { - int h = rng.nextInt(); // Avoid zero to allow xorShift rehash - code = (h == 0) ? 1 : h; - } - } - - /** - * The corresponding ThreadLocal class - */ - static final class ThreadHashCode extends ThreadLocal { - public HashCode initialValue() { return new HashCode(); } - } - - /** - * Static per-thread hash codes. Shared across all instances to - * reduce ThreadLocal pollution and because adjustments due to - * collisions in one table are likely to be appropriate for - * others. - */ - static final ThreadHashCode threadHashCode = new ThreadHashCode(); - - /** Number of CPUS, to place bound on table size */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * Table of cells. When non-null, size is a power of 2. - */ - transient volatile Cell[] cells; - - /** - * Base value, used mainly when there is no contention, but also as - * a fallback during table initialization races. Updated via CAS. - */ - transient volatile long base; - - /** - * Spinlock (locked via CAS) used when resizing and/or creating Cells. - */ - transient volatile int busy; - - /** - * Package-private default constructor - */ - Striped64() { - } - - /** - * CASes the base field. - */ - final boolean casBase(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); - } - - /** - * CASes the busy field from 0 to 1 to acquire lock. - */ - final boolean casBusy() { - return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); - } - - /** - * Computes the function of current and new value. Subclasses - * should open-code this update function for most uses, but the - * virtualized form is needed within retryUpdate. - * - * @param currentValue the current value (of either base or a cell) - * @param newValue the argument from a user update call - * @return result of the update function - */ - abstract long fn(long currentValue, long newValue); - - /** - * Handles cases of updates involving initialization, resizing, - * creating new Cells, and/or contention. See above for - * explanation. This method suffers the usual non-modularity - * problems of optimistic retry code, relying on rechecked sets of - * reads. - * - * @param x the value - * @param hc the hash code holder - * @param wasUncontended false if CAS failed before call - */ - final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { - int h = hc.code; - boolean collide = false; // True if last slot nonempty - for (;;) { - Cell[] as; Cell a; int n; long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (busy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (busy == 0 && casBusy()) { - boolean created = false; - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - created = true; - } - } finally { - busy = 0; - } - if (created) - break; - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, fn(v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (busy == 0 && casBusy()) { - try { - if (cells == as) { // Expand table unless stale - Cell[] rs = new Cell[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - cells = rs; - } - } finally { - busy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; - } - else if (busy == 0 && cells == as && casBusy()) { - boolean init = false; - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - init = true; - } - } finally { - busy = 0; - } - if (init) - break; - } - else if (casBase(v = base, fn(v, x))) - break; // Fall back on using base - } - hc.code = h; // Record index for next time - } - - - /** - * Sets base and all cells to the given value. - */ - final void internalReset(long initialValue) { - Cell[] as = cells; - base = initialValue; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - a.value = initialValue; - } - } - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long baseOffset; - private static final long busyOffset; - static { - try { - UNSAFE = getUnsafe(); - Class sk = Striped64.class; - baseOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("base")); - busyOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("busy")); - } catch (Exception e) { - throw new Error(e); - } - } - - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ - private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } - } - -} diff --git a/src/main/java/org/hbase/async/jsr166e/package-info.java b/src/main/java/org/hbase/async/jsr166e/package-info.java deleted file mode 100644 index 4538865..0000000 --- a/src/main/java/org/hbase/async/jsr166e/package-info.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. - * This file is part of Async HBase. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - Neither the name of the StumbleUpon nor the names of its contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - *

This package is not part of asynchbase's public interface.

- * Use {@link org.hbase.async.Counter} instead. - */ -package org.hbase.async.jsr166e; diff --git a/src/test/java/org/hbase/async/FiltersTest.java b/src/test/java/org/hbase/async/FiltersTest.java index 88f0d0e..b2a4212 100644 --- a/src/test/java/org/hbase/async/FiltersTest.java +++ b/src/test/java/org/hbase/async/FiltersTest.java @@ -4,14 +4,14 @@ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: - * - Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - Neither the name of the StumbleUpon nor the names of its contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the StumbleUpon nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE @@ -26,6 +26,11 @@ */ package org.hbase.async; +import java.lang.reflect.Field; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos; import org.junit.Assert; import org.junit.Test; @@ -35,35 +40,167 @@ @RunWith(JUnit4.class) public class FiltersTest { - @Test - public void ensureKeyOnlyFilterIsCorrectlyCreated() { - KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(); - org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(false)); - } - - @Test - public void ensureKeyOnlyFilterIsCorrectlyCreatedWithArgs() { - KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(true); - org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(true)); - } - - @Test - public void ensureColumnPrefixFilterIsCorrectlyCreated() { - final byte[] prefix = Bytes.UTF8("aoeu"); - ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(prefix); - org.apache.hadoop.hbase.filter.ColumnPrefixFilter filter = (org.apache.hadoop.hbase.filter.ColumnPrefixFilter) columnPrefixFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.getPrefix(), prefix); - } - - private byte[] keyOnlyToByteArray(boolean value) { - FilterProtos.KeyOnlyFilter.Builder builder = org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos.KeyOnlyFilter.newBuilder(); - builder.setLenAsVal(value); - return builder.build().toByteArray(); - } + @Test + public void ensureKeyOnlyFilterIsCorrectlyCreated() { + KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(); + org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(false)); + } + @Test + public void ensureKeyOnlyFilterIsCorrectlyCreatedWithArgs() { + KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(true); + org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(true)); + } + + @Test + public void ensureColumnPrefixFilterIsCorrectlyCreated() { + final byte[] prefix = Bytes.UTF8("aoeu"); + ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(prefix); + org.apache.hadoop.hbase.filter.ColumnPrefixFilter filter = (org.apache.hadoop.hbase.filter.ColumnPrefixFilter) columnPrefixFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.getPrefix(), prefix); + } + + // ---------------------------------------------------------------------- + // RE2/Bigtable Sanitization Tests + // ---------------------------------------------------------------------- + + @Test + public void ensureKeyRegexpFilterSanitization() throws Exception { + // 1. Basic Structural Regex (No Quotes) + // Should pass ASCII through as-is. + byte[] regex = Bytes.UTF8("^sys\\.cpu"); + KeyRegexpFilter keyRegex = new KeyRegexpFilter(regex); + + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: ^sys\.cpu + Assert.assertEquals("^sys\\.cpu", pattern); + } + + @Test + public void ensureKeyRegexpFilterStripAndEscape() throws Exception { + // 2. The OpenTSDB "Quoting" Conflict Scenario + // Input: \Q\E + // Bytes: [92, 81, 0, 1, 92, 69] + // Logic should strip \Q(92,81) and \E(92,69) and hex-escape the contents. + + byte[] quotedBinary = new byte[] { + 92, 81, // \Q + 0, 1, // Binary 0x00, 0x01 + 92, 69 // \E + }; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(quotedBinary); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: \x00\x01 + // Explanation: + // - \Q removed + // - 0 becomes \x00 + // - 1 becomes \x01 + // - \E removed + Assert.assertEquals("\\x00\\x01", pattern); + } + + @Test + public void ensureKeyRegexpFilterMixedContent() throws Exception { + // 3. Mixed Structural and Binary + // Input: ^metric\Q<0x00><0xFF>\E$ + // This simulates a real UID query: Start Anchor + Metric Name + Binary ID + End Anchor + + byte[] mixed = new byte[14]; + int p = 0; + mixed[p++] = '^'; + mixed[p++] = 'm'; + mixed[p++] = 'e'; + mixed[p++] = 't'; + mixed[p++] = 'r'; + mixed[p++] = 'i'; + mixed[p++] = 'c'; + mixed[p++] = 92; mixed[p++] = 81; // \Q + mixed[p++] = 0; // 0x00 + mixed[p++] = (byte) 0xFF; // 0xFF (High bit char) + mixed[p++] = 92; mixed[p++] = 69; // \E + mixed[p++] = '$'; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(mixed); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: ^metric\x00\xff$ + // - ^metric passed as ASCII + // - \Q stripped + // - 0 -> \x00 + // - 0xFF -> \xff + // - \E stripped + // - $ passed as ASCII + Assert.assertEquals("^metric\\x00\\xff$", pattern); + } + + @Test + public void ensureDoubleEscapingLogic() throws Exception { + // 4. Edge Case: Escaped Quote literal inside a quote? + // Note: The current logic is simple state-stripping. + // If OpenTSDB sends \Q.\E, we want \. (literal dot), NOT \x2e. + // RE2 interprets \x2e as a regex wildcard "." (match any), but \. as a literal dot. + + byte[] input = new byte[] { + 92, 81, // \Q + 46, // '.' (dot) - normally a regex wildcard + 92, 69 // \E + }; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(input); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: \. + // We stripped \Q and \E. + // We detected '.' is a metacharacter. + // We escaped it with a backslash. + Assert.assertEquals("\\.", pattern); + } + + // ---------------------------------------------------------------------- + // Helpers + // ---------------------------------------------------------------------- + + private byte[] keyOnlyToByteArray(boolean value) { + FilterProtos.KeyOnlyFilter.Builder builder = org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos.KeyOnlyFilter.newBuilder(); + builder.setLenAsVal(value); + return builder.build().toByteArray(); + } + + /** + * Uses reflection to extract the internal Regex pattern string from the RowFilter. + * Corrected to use ByteArrayComparable.getValue() which works across HBase versions. + */ + private String extractRegexPattern(RowFilter rowFilter) throws Exception { + // 1. RowFilter contains a comparator. In some HBase versions this field is protected, + // so we use reflection to ensure access. + Field comparatorField = CompareFilter.class.getDeclaredField("comparator"); + comparatorField.setAccessible(true); + + // 2. The comparator is an instance of RegexStringComparator. + // RegexStringComparator extends ByteArrayComparable. + // ByteArrayComparable stores the raw bytes of the comparator value (the regex string) + // in a field accessible via the public getValue() method. + ByteArrayComparable comparator = (ByteArrayComparable) comparatorField.get(rowFilter); + + if (comparator == null) { + throw new RuntimeException("Filter comparator was null"); + } + + // 3. Convert the raw bytes back to a UTF-8 string for verification. + // We use UTF-8 here just to read the regex string back into Java for the assertion. + return new String(comparator.getValue(), "UTF-8"); + } } +