Skip to content

Commit

Permalink
Following changes:
Browse files Browse the repository at this point in the history
 # Upgraded Jackson
 # When deserializing a 'Result', use first (latest) entry instead of last entry.
 # Moved codec initialization from clinit to init
 # Avoid multiple class validations
 # Minor improvements to test cases
  • Loading branch information
m-manu committed May 11, 2020
1 parent d3a57cf commit 21fbf4b
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 57 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ Add below entry within the `dependencies` section of your `pom.xml`:
<dependency>
<groupId>com.flipkart</groupId>
<artifactId>hbase-object-mapper</artifactId>
<version>1.14.1</version>
<version>1.15</version>
</dependency>
```

Expand All @@ -423,7 +423,7 @@ See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](ht
To build this project, follow below simple steps:

1. Do a `git clone` of this repository
2. Checkout latest stable version `git checkout v1.14.1`
2. Checkout latest stable version `git checkout v1.15`
3. Execute `mvn clean install` from shell

### Please note:
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.flipkart</groupId>
<artifactId>hbase-object-mapper</artifactId>
<version>1.14.1</version>
<version>1.15</version>
<url>https://github.com/flipkart-incubator/hbase-orm</url>
<scm>
<url>https://github.com/flipkart-incubator/hbase-orm</url>
Expand Down Expand Up @@ -64,7 +64,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.3</version>
<version>2.9.10.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -75,7 +75,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<version>1.18.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected AbstractHBDAO(Configuration configuration) throws IOException {
public T get(R rowKey, int numVersionsToFetch) throws IOException {
try (Table table = getHBaseTable()) {
Result result = table.get(new Get(toBytes(rowKey)).readVersions(numVersionsToFetch));
return hbObjectMapper.readValue(rowKey, result, hbRecordClass);
return hbObjectMapper.readValueFromResult(result, hbRecordClass);
}
}

Expand Down Expand Up @@ -177,7 +177,7 @@ public Get getGet(R rowKey) {
public T getOnGet(Get get) throws IOException {
try (Table table = getHBaseTable()) {
Result result = table.get(get);
return hbObjectMapper.readValue(result, hbRecordClass);
return hbObjectMapper.readValueFromResult(result, hbRecordClass);
}
}

Expand All @@ -192,7 +192,7 @@ public List<T> getOnGets(List<Get> gets) throws IOException {
try (Table table = getHBaseTable()) {
Result[] results = table.get(gets);
for (Result result : results) {
records.add(hbObjectMapper.readValue(result, hbRecordClass));
records.add(hbObjectMapper.readValueFromResult(result, hbRecordClass));
}
}
return records;
Expand All @@ -216,7 +216,7 @@ public T[] get(R[] rowKeys, int numVersionsToFetch) throws IOException {
try (Table table = getHBaseTable()) {
Result[] results = table.get(gets);
for (int i = 0; i < records.length; i++) {
records[i] = hbObjectMapper.readValue(rowKeys[i], results[i], hbRecordClass);
records[i] = hbObjectMapper.readValueFromResult(results[i], hbRecordClass);
}
}
return records;
Expand Down Expand Up @@ -250,7 +250,7 @@ public List<T> get(List<R> rowKeys, int numVersionsToFetch) throws IOException {
try (Table table = getHBaseTable()) {
Result[] results = table.get(gets);
for (Result result : results) {
records.add(hbObjectMapper.readValue(result, hbRecordClass));
records.add(hbObjectMapper.readValueFromResult(result, hbRecordClass));
}
}
return records;
Expand Down Expand Up @@ -323,7 +323,7 @@ public List<T> get(Scan scan) throws IOException {
try (Table table = getHBaseTable();
ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
records.add(hbObjectMapper.readValue(result, hbRecordClass));
records.add(hbObjectMapper.readValueFromResult(result, hbRecordClass));
}
}
return records;
Expand Down Expand Up @@ -518,7 +518,7 @@ public Increment getIncrement(R rowKey) {
public T increment(Increment increment) throws IOException {
try (Table table = getHBaseTable()) {
Result result = table.increment(increment);
return hbObjectMapper.readValue(result, hbRecordClass);
return hbObjectMapper.readValueFromResult(result, hbRecordClass);
}
}

Expand Down Expand Up @@ -569,7 +569,7 @@ public T append(R rowKey, Map<String, Object> valuesToAppend) throws IOException
}
try (Table table = getHBaseTable()) {
Result result = table.append(append);
return hbObjectMapper.readValue(result, hbRecordClass);
return hbObjectMapper.readValueFromResult(result, hbRecordClass);
}
}

Expand Down Expand Up @@ -599,7 +599,7 @@ public Append getAppend(R rowKey) {
public T append(Append append) throws IOException {
try (Table table = getHBaseTable()) {
Result result = table.append(append);
return hbObjectMapper.readValue(result, hbRecordClass);
return hbObjectMapper.readValueFromResult(result, hbRecordClass);
}
}

Expand All @@ -623,7 +623,7 @@ public List<T> get(R startRowKey, R endRowKey) throws IOException {
* @throws IOException When HBase call fails
*/
public R persist(HBRecord<R> record) throws IOException {
Put put = hbObjectMapper.writeValueAsPut(record);
Put put = hbObjectMapper.writeValueAsPut0(record);
try (Table table = getHBaseTable()) {
table.put(put);
return record.composeRowKey();
Expand All @@ -640,9 +640,9 @@ public R persist(HBRecord<R> record) throws IOException {
public List<R> persist(List<T> records) throws IOException {
List<Put> puts = new ArrayList<>(records.size());
List<R> rowKeys = new ArrayList<>(records.size());
for (HBRecord<R> object : records) {
puts.add(hbObjectMapper.writeValueAsPut(object));
rowKeys.add(object.composeRowKey());
for (HBRecord<R> record : records) {
puts.add(hbObjectMapper.writeValueAsPut0(record));
rowKeys.add(record.composeRowKey());
}
try (Table table = getHBaseTable()) {
table.put(puts);
Expand Down
33 changes: 17 additions & 16 deletions src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
*/
public class HBObjectMapper {

private static final Codec DEFAULT_CODEC = new BestSuitCodec();

private final Codec codec;

/**
Expand All @@ -60,7 +58,7 @@ public HBObjectMapper(Codec codec) {
* @see #HBObjectMapper(Codec)
*/
public HBObjectMapper() {
this(DEFAULT_CODEC);
this(new BestSuitCodec());
}

/**
Expand Down Expand Up @@ -88,7 +86,10 @@ <R extends Serializable & Comparable<R>, T extends HBRecord<R>> R bytesToRowKey(
*
* @see #convertRecordToMap(HBRecord)
*/
private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> T convertMapToRecord(byte[] rowKeyBytes, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map, Class<T> clazz) {
private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> T convertMapToRecord(
byte[] rowKeyBytes,
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map,
Class<T> clazz) {
Collection<Field> fields = getHBColumnFields0(clazz).values();
WrappedHBTable<R, T> hbTable = new WrappedHBTable<>(clazz);
R rowKey = bytesToRowKey(rowKeyBytes, hbTable.getCodecFlags(), clazz);
Expand All @@ -115,8 +116,8 @@ record = clazz.getDeclaredConstructor()
if (columnVersionsMap == null || columnVersionsMap.isEmpty()) {
continue;
}
Map.Entry<Long, byte[]> lastEntry = columnVersionsMap.lastEntry();
objectSetFieldValue(record, field, lastEntry.getValue(), hbColumn.codecFlags());
Map.Entry<Long, byte[]> firstEntry = columnVersionsMap.firstEntry();
objectSetFieldValue(record, field, firstEntry.getValue(), hbColumn.codecFlags());
} else {
objectSetFieldValue(record, field, columnVersionsMap, hbColumn.codecFlags());
}
Expand Down Expand Up @@ -253,7 +254,8 @@ private void validateHBColumnField(Field field) {
* @see #convertMapToRecord(byte[], NavigableMap, Class)
*/
@SuppressWarnings("unchecked")
private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> convertRecordToMap(HBRecord<R> record) {
private <R extends Serializable & Comparable<R>, T extends HBRecord<R>>
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> convertRecordToMap(HBRecord<R> record) {
Class<T> clazz = (Class<T>) record.getClass();
Collection<Field> fields = getHBColumnFields0(clazz).values();
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Expand Down Expand Up @@ -341,6 +343,10 @@ private <R extends Serializable & Comparable<R>> NavigableMap<Long, byte[]> getF
@SuppressWarnings("unchecked")
public <R extends Serializable & Comparable<R>, T extends HBRecord<R>> Put writeValueAsPut(HBRecord<R> record) {
validateHBClass((Class<T>) record.getClass());
return writeValueAsPut0(record);
}

<R extends Serializable & Comparable<R>> Put writeValueAsPut0(HBRecord<R> record) {
Put put = new Put(composeRowKey(record));
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> fe : convertRecordToMap(record).entrySet()) {
byte[] family = fe.getKey();
Expand Down Expand Up @@ -456,18 +462,13 @@ public <R extends Serializable & Comparable<R>, T extends HBRecord<R>> T readVal
return readValueFromResult(result, clazz);
}

<R extends Serializable & Comparable<R>, T extends HBRecord<R>> T readValue(R rowKey, Result result, Class<T> clazz) {
if (rowKey == null)
return readValueFromResult(result, clazz);
else
return readValueFromRowAndResult(rowKeyToBytes(rowKey, WrappedHBTable.getCodecFlags(clazz)), result, clazz);
}

private boolean isResultEmpty(Result result) {
return result == null || result.isEmpty() || result.getRow() == null || result.getRow().length == 0;
if (result == null || result.isEmpty()) return true;
byte[] rowBytes = result.getRow();
return rowBytes == null || rowBytes.length == 0;
}

private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> T readValueFromResult(Result result, Class<T> clazz) {
<R extends Serializable & Comparable<R>, T extends HBRecord<R>> T readValueFromResult(Result result, Class<T> clazz) {
if (isResultEmpty(result)) return null;
return convertMapToRecord(result.getRow(), result.getMap(), clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ private HBObjectMapperFactory() {
/**
* Default instance of {@link HBObjectMapper}
*/
private static final HBObjectMapper hbObjectMapper = new HBObjectMapper();
private static HBObjectMapper hbObjectMapper;
private static final Object[] lock = new Object[0];

static HBObjectMapper construct(Codec codec) {
return codec == null ? hbObjectMapper : new HBObjectMapper(codec);
if (hbObjectMapper == null) {
synchronized (lock) {
hbObjectMapper = codec == null ? new HBObjectMapper() : new HBObjectMapper(codec);
}
}
return hbObjectMapper;
}
}

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.flipkart.hbaseobjectmapper.codec;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.*;
import com.flipkart.hbaseobjectmapper.Flag;
import com.flipkart.hbaseobjectmapper.codec.exceptions.DeserializationException;
import com.flipkart.hbaseobjectmapper.codec.exceptions.SerializationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ public void testInvalidObjects() {
String errorMessage = "An object with " + p.getSecond() + " should've thrown an " + p.getThird().getName();
try {
hbMapper.writeValueAsResult(record);
fail(errorMessage + " while converting bean to Result\nFailing object = " + record);
fail(String.format("%s while converting bean to Result%nFailing object = %s", errorMessage, record));
} catch (IllegalArgumentException ex) {
assertEquals(p.getThird(), ex.getClass(), "Mismatch in type of exception thrown");
}
try {
hbMapper.writeValueAsPut(record);
fail(errorMessage + " while converting bean to Put\nFailing object = " + record);
fail(String.format("%s while converting bean to Put%nFailing object = %s", errorMessage, record));
} catch (IllegalArgumentException ex) {
assertEquals(p.getThird(), ex.getClass(), "Mismatch in type of exception thrown");
}
Expand Down Expand Up @@ -324,22 +324,24 @@ public void testUninstantiatableClass() {

@Test
public void testHBColumnMultiVersion() {
Double[] testNumbers = new Double[]{3.14159, 2.71828, 0.0};
for (Double n : testNumbers) {
Double[] numbers = new Double[]{0.0, 3.14159, 2.71828};
for (Double number : numbers) {
// Written as unversioned, read as versioned
Result result = hbMapper.writeValueAsResult(new CrawlNoVersion("key").setF1(n));
Result result = hbMapper.writeValueAsResult(new CrawlNoVersion("key").setF1(number));
Crawl versioned = hbMapper.readValue(result, Crawl.class);
NavigableMap<Long, Double> columnHistory = versioned.getF1();
assertEquals(1, columnHistory.size(), "Column history size mismatch");
assertEquals(n, columnHistory.lastEntry().getValue(), String.format("Inconsistency between %s and %s", HBColumn.class.getSimpleName(), HBColumnMultiVersion.class.getSimpleName()));
assertEquals(number, columnHistory.lastEntry().getValue(), String.format("Inconsistency between %s and %s",
HBColumn.class.getSimpleName(), HBColumnMultiVersion.class.getSimpleName()));
// Written as versioned, read as unversioned
Crawl key = new Crawl("key").addF1(Double.MAX_VALUE).addF1(Double.MAX_VALUE).addF1(Double.MAX_VALUE);
Crawl versionedCrawl = key.addF1(n);
Result result1 = hbMapper.writeValueAsResult(versionedCrawl);
CrawlNoVersion unversionedCrawl = hbMapper.readValue(result1, CrawlNoVersion.class);
Double f1 = unversionedCrawl.getF1();
System.out.println(unversionedCrawl);
assertEquals(n, f1, String.format("Inconsistency between %s and %s\nVersioned (persisted) object = %s\nUnversioned (retrieved) object = %s ", HBColumnMultiVersion.class.getSimpleName(), HBColumn.class.getSimpleName(), versionedCrawl, unversionedCrawl));
Crawl crawl = new Crawl("key").addF1(Double.MAX_VALUE).addF1(Double.MAX_VALUE).addF1(Double.MAX_VALUE);
Crawl versionedCrawl = crawl.addF1(number);
Result crawlAsResult = hbMapper.writeValueAsResult(versionedCrawl);
CrawlNoVersion unversionedCrawl = hbMapper.readValue(crawlAsResult, CrawlNoVersion.class);
assertEquals(number, unversionedCrawl.getF1(), String.format("Inconsistency between %s and %s%n" +
"Versioned (persisted) object = %s%nUnversioned (retrieved) object = %s",
HBColumnMultiVersion.class.getSimpleName(), HBColumn.class.getSimpleName(),
versionedCrawl, unversionedCrawl));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public static void setup() {
fail("The environmental variable " + InMemoryHBaseCluster.INMEMORY_CLUSTER_START_TIMEOUT + " is specified incorrectly (Must be numeric)");
} catch (Exception e) {
e.printStackTrace(System.err);
fail("Failed to connect to HBase. Aborted execution of DAO-related test cases. Reason:\n" + e.getMessage());
fail(String.format("Failed to connect to HBase. Aborted execution of DAO-related test cases." +
"Reason:%n%s", e.getMessage()));
}
}

Expand Down Expand Up @@ -107,9 +108,9 @@ public void testCRUD() throws IOException {
final List<Citizen> records = TestObjects.validCitizenObjects;
assertEquals("citizens", citizenDao.getTableName());
final Set<String> columnFamiliesCitizen = citizenDao.getColumnFamiliesAndVersions().keySet(), columnFamiliesCitizenSummary = citizenSummaryDAO.getColumnFamiliesAndVersions().keySet();
assertEquals(s("main", "optional"), columnFamiliesCitizen, "Issue with column families of 'citizens' table\n" + columnFamiliesCitizen);
assertEquals(s("main", "optional"), columnFamiliesCitizen, "Issue with column families of 'citizens' table%n" + columnFamiliesCitizen);
assertEquals("citizens_summary", citizenSummaryDAO.getTableName());
assertEquals(s("a"), columnFamiliesCitizenSummary, "Issue with column families of 'citizens_summary' table\n" + columnFamiliesCitizenSummary);
assertEquals(s("a"), columnFamiliesCitizenSummary, "Issue with column families of 'citizens_summary' table%n" + columnFamiliesCitizenSummary);
String[] allRowKeys = new String[records.size()];
Map<String, Map<String, Object>> expectedFieldValues = new HashMap<>();
for (int i = 0; i < records.size(); i++) { // for each test object,
Expand Down Expand Up @@ -358,7 +359,7 @@ public void testVersioning() throws IOException {
rowKeysList.add(key);
}
}
String[] rowKeys = rowKeysList.toArray(new String[rowKeysList.size()]);
String[] rowKeys = rowKeysList.toArray(new String[0]);

Set<Double> oldestValuesRangeScan = new HashSet<>(), oldestValuesBulkScan = new HashSet<>();
for (int k = 1; k <= NUM_VERSIONS; k++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

@HBTable(name = "crawls", families = {@Family(name = "a", versions = 10)})
@ToString
Expand Down Expand Up @@ -34,6 +35,10 @@ public void parseRowKey(String rowKey) {
}

public Crawl addF1(Double f1) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
this.f1.put(System.currentTimeMillis(), f1);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.flipkart.hbaseobjectmapper.testcases.util.cluster;

import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Connection;

import java.io.IOException;
Expand Down Expand Up @@ -44,6 +43,9 @@ public Connection start() throws IOException {
throw new IOException("Error starting an in-memory HBase cluster", e);
}
connection = utility.getConnection();
if (connection == null) {
throw new IllegalStateException("Connection could not be established with in-memory HBase cluster");
}
return connection;
}

Expand Down

0 comments on commit 21fbf4b

Please sign in to comment.