Skip to content

Commit e82d2ea

Browse files
authored
Replacing the use of Java object serialization with Jackson (#2080) (#2095)
1 parent e391e7c commit e82d2ea

File tree

15 files changed

+150
-116
lines changed

15 files changed

+150
-116
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/PartitionDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static class PartitionDefinitionBuilder {
4747

4848
private PartitionDefinitionBuilder(Settings settings, Mapping resolvedMapping) {
4949
this.serializedSettings = settings == null ? null : settings.save();
50-
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToBase64(resolvedMapping);
50+
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToJsonString(resolvedMapping);
5151
}
5252

5353
public PartitionDefinition build(String index, int shardId) {

mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
422422
RestRepository repository = new RestRepository(settings);
423423
Mapping fieldMapping = null;
424424
if (StringUtils.hasText(partition.getSerializedMapping())) {
425-
fieldMapping = IOUtils.deserializeFromBase64(partition.getSerializedMapping());
425+
fieldMapping = IOUtils.deserializeFromJsonString(partition.getSerializedMapping(), Mapping.class);
426426
}
427427
else {
428428
log.warn(String.format("No mapping found for [%s] - either no index exists or the partition configuration has been corrupted", partition));

mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/Field.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import java.io.Serializable;
2222
import java.util.Arrays;
2323
import java.util.Collection;
24+
import java.util.Objects;
2425

2526
import org.elasticsearch.hadoop.serialization.FieldType;
27+
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonCreator;
28+
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonProperty;
2629

27-
@SuppressWarnings("serial")
2830
public class Field implements Serializable {
2931

3032
static final Field[] NO_FIELDS = new Field[0];
@@ -41,20 +43,24 @@ public Field(String name, FieldType type, Collection<Field> properties) {
4143
this(name, type, (properties != null ? properties.toArray(new Field[properties.size()]) : NO_FIELDS));
4244
}
4345

44-
Field(String name, FieldType type, Field[] properties) {
46+
@JsonCreator
47+
Field(@JsonProperty("name") String name, @JsonProperty("type") FieldType type, @JsonProperty("properties") Field[] properties) {
4548
this.name = name;
4649
this.type = type;
4750
this.properties = properties;
4851
}
4952

53+
@JsonProperty("properties")
5054
public Field[] properties() {
5155
return properties;
5256
}
5357

58+
@JsonProperty("type")
5459
public FieldType type() {
5560
return type;
5661
}
5762

63+
@JsonProperty("name")
5864
public String name() {
5965
return name;
6066
}
@@ -63,4 +69,15 @@ public String name() {
6369
public String toString() {
6470
return String.format("%s=%s", name, ((type == FieldType.OBJECT || type == FieldType.NESTED) ? Arrays.toString(properties) : type));
6571
}
72+
73+
@Override
74+
public boolean equals(Object o) {
75+
if (o instanceof Field == false) {
76+
return false;
77+
}
78+
Field other = (Field) o;
79+
return Objects.equals(this.name, other.name) &&
80+
Objects.equals(this.type, other.type) &&
81+
Objects.deepEquals(this.properties, other.properties);
82+
}
6683
}

mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/Mapping.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
package org.elasticsearch.hadoop.serialization.dto.mapping;
2121

22+
import org.elasticsearch.hadoop.serialization.FieldType;
23+
import org.elasticsearch.hadoop.serialization.field.FieldFilter;
24+
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonProperty;
25+
2226
import java.io.Serializable;
2327
import java.util.ArrayList;
2428
import java.util.Arrays;
@@ -27,9 +31,7 @@
2731
import java.util.LinkedHashMap;
2832
import java.util.List;
2933
import java.util.Map;
30-
31-
import org.elasticsearch.hadoop.serialization.FieldType;
32-
import org.elasticsearch.hadoop.serialization.field.FieldFilter;
34+
import java.util.Objects;
3335

3436
/**
3537
* A mapping has a name and a collection of fields.
@@ -53,7 +55,7 @@ public Mapping(String index, String name, Collection<Field> fields) {
5355
this(index, name, (fields != null ? fields.toArray(new Field[fields.size()]) : Field.NO_FIELDS));
5456
}
5557

56-
Mapping(String index, String type, Field[] fields) {
58+
Mapping(@JsonProperty("index") String index, @JsonProperty("type") String type, @JsonProperty("fields") Field[] fields) {
5759
this.index = index;
5860
this.type = type;
5961
this.fields = fields;
@@ -154,4 +156,15 @@ public String toString() {
154156
return String.format("%s=%s", index, Arrays.toString(fields));
155157
}
156158
}
159+
160+
@Override
161+
public boolean equals(Object o) {
162+
if (o instanceof Mapping == false) {
163+
return false;
164+
}
165+
Mapping other = (Mapping) o;
166+
return Objects.equals(this.index, other.index) &&
167+
Objects.equals(this.type, other.type) &&
168+
Objects.deepEquals(this.fields, other.fields);
169+
}
157170
}

mr/src/main/java/org/elasticsearch/hadoop/util/IOUtils.java

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
*/
1919
package org.elasticsearch.hadoop.util;
2020

21+
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
22+
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
23+
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.map.ObjectMapper;
24+
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.map.SerializationConfig;
25+
2126
import java.io.ByteArrayInputStream;
2227
import java.io.Closeable;
2328
import java.io.File;
2429
import java.io.IOException;
2530
import java.io.InputStream;
26-
import java.io.ObjectInputStream;
27-
import java.io.ObjectOutputStream;
28-
import java.io.Serializable;
2931
import java.io.StringReader;
3032
import java.io.StringWriter;
3133
import java.lang.reflect.Field;
@@ -35,12 +37,6 @@
3537
import java.net.URL;
3638
import java.util.Properties;
3739

38-
import javax.xml.bind.DatatypeConverter;
39-
40-
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
41-
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
42-
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
43-
4440
/**
4541
* Utility class used internally for the Pig support.
4642
*/
@@ -53,42 +49,38 @@ public abstract class IOUtils {
5349
ReflectionUtils.makeAccessible(BYTE_ARRAY_BUFFER);
5450
}
5551

56-
public static String serializeToBase64(Serializable object) {
52+
private static final ObjectMapper mapper = new ObjectMapper().configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
53+
54+
/**
55+
* This method serializes object into a json String using jackson. The object must support jackson serialization.
56+
*/
57+
public static String serializeToJsonString(Object object) {
5758
if (object == null) {
5859
return StringUtils.EMPTY;
5960
}
60-
FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
61-
ObjectOutputStream oos = null;
61+
final String json;
6262
try {
63-
oos = new ObjectOutputStream(baos);
64-
oos.writeObject(object);
63+
json = mapper.writeValueAsString(object);
6564
} catch (IOException ex) {
66-
throw new EsHadoopSerializationException("Cannot serialize object " + object, ex);
67-
} finally {
68-
close(oos);
65+
throw new EsHadoopSerializationException("Cannot serialize object: " + object, ex);
6966
}
70-
return DatatypeConverter.printBase64Binary(baos.bytes().bytes());
67+
return json;
7168
}
7269

73-
@SuppressWarnings("unchecked")
74-
public static <T extends Serializable> T deserializeFromBase64(String data) {
70+
/**
71+
* This method deserializes a String that was created by serializeToJsonString
72+
*/
73+
public static <T> T deserializeFromJsonString(String data, Class<T> clazz) {
7574
if (!StringUtils.hasLength(data)) {
7675
return null;
7776
}
78-
79-
byte[] rawData = DatatypeConverter.parseBase64Binary(data);
80-
ObjectInputStream ois = null;
77+
final T object;
8178
try {
82-
ois = new ObjectInputStream(new FastByteArrayInputStream(rawData));
83-
Object o = ois.readObject();
84-
return (T) o;
85-
} catch (ClassNotFoundException ex) {
86-
throw new EsHadoopIllegalStateException("cannot deserialize object", ex);
87-
} catch (IOException ex) {
88-
throw new EsHadoopSerializationException("cannot deserialize object", ex);
89-
} finally {
90-
close(ois);
79+
object = mapper.readValue(data, clazz);
80+
} catch (IOException e) {
81+
throw new EsHadoopSerializationException("Cannot deserialize string: [" + data + "]", e);
9182
}
83+
return object;
9284
}
9385

9486
public static String propsToString(Properties props) {

mr/src/main/java/org/elasticsearch/hadoop/util/SettingsUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import java.net.InetAddress;
3131
import java.net.UnknownHostException;
32-
import java.util.ArrayList;
3332
import java.util.Collection;
3433
import java.util.LinkedHashMap;
3534
import java.util.LinkedHashSet;
@@ -177,11 +176,11 @@ public static void setFilters(Settings settings, String... filters) {
177176
return;
178177
}
179178

180-
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS, IOUtils.serializeToBase64(filters));
179+
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS, IOUtils.serializeToJsonString(filters));
181180
}
182181

183182
public static String[] getFilters(Settings settings) {
184-
return IOUtils.deserializeFromBase64(settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS));
183+
return IOUtils.deserializeFromJsonString(settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS), String[].class);
185184
}
186185

187186
public static String determineSourceFields(Settings settings) {

mr/src/test/java/org/elasticsearch/hadoop/util/IOUtilsTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,25 @@
1919

2020
package org.elasticsearch.hadoop.util;
2121

22+
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
23+
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
24+
import org.elasticsearch.hadoop.serialization.FieldType;
25+
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
26+
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
27+
import org.junit.Test;
28+
2229
import java.io.BufferedReader;
2330
import java.io.BufferedWriter;
2431
import java.io.File;
2532
import java.io.FileOutputStream;
2633
import java.io.InputStream;
2734
import java.io.InputStreamReader;
2835
import java.io.OutputStreamWriter;
36+
import java.util.ArrayList;
37+
import java.util.List;
2938

30-
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
31-
import org.junit.Test;
32-
39+
import static junit.framework.TestCase.assertNull;
40+
import static org.junit.Assert.assertArrayEquals;
3341
import static org.junit.Assert.assertEquals;
3442
import static org.junit.Assert.assertNotNull;
3543
import static org.junit.Assert.fail;
@@ -63,5 +71,20 @@ public void openNonExistingFile() throws Exception {
6371
InputStream inputStream = IOUtils.open("file:///This/Doesnt/Exist");
6472
fail("Shouldn't pass");
6573
}
66-
67-
}
74+
@Test
75+
public void testDeserializeFromJsonString() {
76+
assertNull(IOUtils.deserializeFromJsonString("", String.class));
77+
try {
78+
IOUtils.deserializeFromJsonString("junk", String.class);
79+
fail("Should have thrown an EsHadoopIllegalArgumentException");
80+
} catch (EsHadoopSerializationException expected) {}
81+
List<Field> fieldsList = new ArrayList<>();
82+
fieldsList.add(new Field("%s", FieldType.TEXT));
83+
Mapping mapping = new Mapping("*", "*", fieldsList);
84+
Mapping roundTripMapping = IOUtils.deserializeFromJsonString(IOUtils.serializeToJsonString(mapping), Mapping.class);
85+
assertEquals(mapping, roundTripMapping);
86+
String[] filters = new String[]{"{\"exists\":{\"field\":\"id\"}}", "{\"match\":{\"id\":1}}"};
87+
String[] roundTripFilters = IOUtils.deserializeFromJsonString(IOUtils.serializeToJsonString(filters), String[].class);
88+
assertArrayEquals(filters, roundTripFilters);
89+
}
90+
}

pig/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void checkSchema(ResourceSchema s) throws IOException {
141141
// save schema to back-end for JSON translation
142142
if (!StringUtils.hasText(props.getProperty(ResourceSchema.class.getName()))) {
143143
// save the schema as String (used JDK serialization since toString() screws up the signature - see the testcase)
144-
props.setProperty(ResourceSchema.class.getName(), IOUtils.serializeToBase64(s));
144+
props.setProperty(ResourceSchema.class.getName(), IOUtils.serializeToJsonString(s));
145145
}
146146
}
147147

@@ -203,7 +203,7 @@ public void prepareToWrite(RecordWriter writer) throws IOException {
203203
this.schema = new ResourceSchema();
204204
}
205205
else {
206-
this.schema = IOUtils.deserializeFromBase64(s);
206+
this.schema = IOUtils.deserializeFromJsonString(s, ResourceSchema.class);
207207
}
208208
this.pigTuple = new PigTuple(schema);
209209
}

pig/src/test/java/org/elasticsearch/hadoop/pig/PigSchemaSaveTest.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
209209
if (Utils.LOGGER.isTraceEnabled()) {
210210
Utils.LOGGER.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
211211
}
212-
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToBase64(filterString))
212+
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToJsonString(filterString))
213213
}
214214
else {
215215
if (Utils.LOGGER.isTraceEnabled()) {

0 commit comments

Comments
 (0)