Skip to content

Commit 94eafed

Browse files
lucebertmasseyke
andauthored
add scripted upsert feature (#1454)
* add script upsert feature * Optimize imports * small refacto in JsonScriptTemplateBulk preProcess * Adding an integration test and avoiding use of Collections.emptyMap() * formatting Co-authored-by: Keith Massey <keith.massey@elastic.co>
1 parent 755aa1c commit 94eafed

File tree

9 files changed

+251
-2
lines changed

9 files changed

+251
-2
lines changed

mr/src/main/java/org/elasticsearch/hadoop/cfg/ConfigurationOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ public interface ConfigurationOptions {
251251

252252
String ES_UPDATE_SCRIPT_FILE = "es.update.script.file";
253253
String ES_UPDATE_SCRIPT_INLINE = "es.update.script.inline";
254+
String ES_UPDATE_SCRIPT_UPSERT = "es.update.script.upsert";
255+
String ES_UPDATE_SCRIPT_UPSERT_DEFAULT = "false";
254256
String ES_UPDATE_SCRIPT_STORED = "es.update.script.stored";
255257
String ES_UPDATE_SCRIPT_LEGACY = "es.update.script";
256258
String ES_UPDATE_SCRIPT_LANG = "es.update.script.lang";

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,10 @@ public String getUpdateScriptInline() {
370370
return getLegacyProperty(ES_UPDATE_SCRIPT_LEGACY, ES_UPDATE_SCRIPT_INLINE, null);
371371
}
372372

373+
public Boolean getUpdateScriptUpsert() {
374+
return Booleans.parseBoolean(getProperty(ES_UPDATE_SCRIPT_UPSERT, ES_UPDATE_SCRIPT_UPSERT_DEFAULT));
375+
}
376+
373377
public String getUpdateScriptFile() {
374378
return getProperty(ES_UPDATE_SCRIPT_FILE);
375379
}
@@ -409,6 +413,11 @@ public boolean hasUpdateScriptParamsJson() {
409413
return hasUpdateScript() && StringUtils.hasText(getUpdateScriptParamsJson());
410414
}
411415

416+
public boolean hasScriptUpsert() {
417+
String op = getOperation();
418+
return ConfigurationOptions.ES_OPERATION_UPSERT.equals(op) && getUpdateScriptUpsert();
419+
}
420+
412421
private String getLegacyProperty(String legacyProperty, String newProperty, String defaultValue) {
413422
String legacy = getProperty(legacyProperty);
414423
if (StringUtils.hasText(legacy)) {

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/JsonScriptTemplateBulk.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ protected Object preProcess(Object object, BytesArray storage) {
4646
BytesArray ba = null;
4747
if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(settings.getOperation())) {
4848
ba = storage;
49+
if (settings.hasScriptUpsert()) {
50+
jsonWriter.convert("{}", ba);
51+
scratchPad.reset();
52+
ba = scratchPad;
53+
}
4954
}
5055
else {
5156
scratchPad.reset();

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/ScriptTemplateBulk.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
2424
import org.elasticsearch.hadoop.cfg.Settings;
2525
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
26+
import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator;
2627
import org.elasticsearch.hadoop.util.BytesArray;
28+
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
2729

2830
class ScriptTemplateBulk extends TemplatedBulk {
2931

@@ -38,7 +40,15 @@ class ScriptTemplateBulk extends TemplatedBulk {
3840
@Override
3941
protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> writer) {
4042
if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(settings.getOperation())) {
41-
super.doWriteObject(object, storage, writer);
43+
if (settings.hasScriptUpsert()) {
44+
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(storage);
45+
JacksonJsonGenerator generator = new JacksonJsonGenerator(bos);
46+
generator.writeBeginObject();
47+
generator.writeEndObject();
48+
generator.close();
49+
} else {
50+
super.doWriteObject(object, storage, writer);
51+
}
4252
}
4353
}
4454
}

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/UpdateBulkFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class UpdateBulkFactory extends AbstractBulkFactory {
4141
private final String SCRIPT_1X;
4242
private final String SCRIPT_LANG_1X;
4343

44-
private final boolean HAS_SCRIPT, HAS_LANG;
44+
private final boolean HAS_SCRIPT, HAS_LANG, HAS_SCRIPT_UPSERT;
4545
private final boolean UPSERT;
4646

4747
public UpdateBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion esMajorVersion) {
@@ -56,6 +56,7 @@ public UpdateBulkFactory(Settings settings, boolean upsert, MetadataExtractor me
5656
RETRY_HEADER = getRequestParameterNames().retryOnConflict + RETRY_ON_FAILURE + "";
5757

5858
HAS_SCRIPT = settings.hasUpdateScript();
59+
HAS_SCRIPT_UPSERT = settings.hasScriptUpsert();
5960
HAS_LANG = StringUtils.hasText(settings.getUpdateScriptLang());
6061

6162
SCRIPT_LANG_5X = ",\"lang\":\"" + settings.getUpdateScriptLang() + "\"";
@@ -142,13 +143,17 @@ private void writeLegacyFormatting(List<Object> list, Object paramExtractor) {
142143
* "params": ...,
143144
* "lang": "...",
144145
* "script": "...",
146+
* "scripted_upsert":true,
145147
* "upsert": {...}
146148
* }
147149
*/
148150
if (HAS_LANG) {
149151
list.add(SCRIPT_LANG_1X);
150152
}
151153
list.add(SCRIPT_1X);
154+
if (HAS_SCRIPT_UPSERT) {
155+
list.add(",\"scripted_upsert\": true");
156+
}
152157
if (UPSERT) {
153158
list.add(",\"upsert\":");
154159
}
@@ -181,6 +186,7 @@ private void writeStrictFormatting(List<Object> list, Object paramExtractor, Str
181186
* "lang": "...",
182187
* "params": ...,
183188
* },
189+
* "scripted_upsert":true,
184190
* "upsert": {...}
185191
* }
186192
*/
@@ -193,6 +199,9 @@ private void writeStrictFormatting(List<Object> list, Object paramExtractor, Str
193199
list.add(paramExtractor);
194200
}
195201
list.add("}");
202+
if (HAS_SCRIPT_UPSERT) {
203+
list.add(",\"scripted_upsert\": true");
204+
}
196205
if (UPSERT) {
197206
list.add(",\"upsert\":");
198207
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static Collection<Object[]> data() {
6868
String[] operations = new String[]{ConfigurationOptions.ES_OPERATION_INDEX,
6969
ConfigurationOptions.ES_OPERATION_CREATE,
7070
ConfigurationOptions.ES_OPERATION_UPDATE,
71+
ConfigurationOptions.ES_OPERATION_UPSERT,
7172
ConfigurationOptions.ES_OPERATION_DELETE};
7273
boolean[] asJsons = new boolean[]{false, true};
7374
EsMajorVersion[] versions = new EsMajorVersion[]{EsMajorVersion.V_1_X,
@@ -110,6 +111,7 @@ public void prepare() {
110111
@Test
111112
public void testNoHeader() throws Exception {
112113
assumeFalse(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
114+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
113115
assumeFalse(ConfigurationOptions.ES_OPERATION_DELETE.equals(operation));
114116
create(settings()).write(data).copyTo(ba);
115117
String result = prefix() + "}}" + map();
@@ -120,6 +122,7 @@ public void testNoHeader() throws Exception {
120122
// check user friendliness and escape the string if needed
121123
public void testConstantId() throws Exception {
122124
assumeFalse(isDeleteOP() && jsonInput);
125+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
123126
Settings settings = settings();
124127
noId = true;
125128
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<foobar>");
@@ -133,6 +136,7 @@ public void testConstantId() throws Exception {
133136
@Test
134137
public void testParent() throws Exception {
135138
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
139+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
136140
assumeFalse(isDeleteOP() && jsonInput);
137141
Settings settings = settings();
138142
settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>");
@@ -145,6 +149,7 @@ public void testParent() throws Exception {
145149
@Test
146150
public void testParent7X() throws Exception {
147151
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
152+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
148153
assumeFalse(isDeleteOP() && jsonInput);
149154
Settings settings = settings();
150155
settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>");
@@ -157,6 +162,7 @@ public void testParent7X() throws Exception {
157162
@Test
158163
public void testVersion() throws Exception {
159164
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
165+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
160166
assumeFalse(isDeleteOP() && jsonInput);
161167
Settings settings = settings();
162168
settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>");
@@ -169,6 +175,7 @@ public void testVersion() throws Exception {
169175
@Test
170176
public void testVersion7X() throws Exception {
171177
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
178+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
172179
assumeFalse(isDeleteOP() && jsonInput);
173180
Settings settings = settings();
174181
settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>");
@@ -181,6 +188,7 @@ public void testVersion7X() throws Exception {
181188
@Test
182189
public void testTtl() throws Exception {
183190
assumeFalse(isDeleteOP() && jsonInput);
191+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
184192
Settings settings = settings();
185193
settings.setProperty(ConfigurationOptions.ES_MAPPING_TTL, "<2>");
186194

@@ -192,6 +200,7 @@ public void testTtl() throws Exception {
192200
@Test
193201
public void testTimestamp() throws Exception {
194202
assumeFalse(isDeleteOP() && jsonInput);
203+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
195204
Settings settings = settings();
196205
settings.setProperty(ConfigurationOptions.ES_MAPPING_TIMESTAMP, "<3>");
197206
create(settings).write(data).copyTo(ba);
@@ -202,6 +211,7 @@ public void testTimestamp() throws Exception {
202211
@Test
203212
public void testRouting() throws Exception {
204213
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
214+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
205215
assumeFalse(isDeleteOP() && jsonInput);
206216
Settings settings = settings();
207217
settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>");
@@ -214,6 +224,7 @@ public void testRouting() throws Exception {
214224
@Test
215225
public void testRouting7X() throws Exception {
216226
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
227+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
217228
assumeFalse(isDeleteOP() && jsonInput);
218229
Settings settings = settings();
219230
settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>");
@@ -226,6 +237,7 @@ public void testRouting7X() throws Exception {
226237
@Test
227238
public void testAll() throws Exception {
228239
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
240+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
229241
assumeFalse(isDeleteOP() && jsonInput);
230242
Settings settings = settings();
231243
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
@@ -240,6 +252,7 @@ public void testAll() throws Exception {
240252
@Test
241253
public void testAll7X() throws Exception {
242254
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
255+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
243256
assumeFalse(isDeleteOP() && jsonInput);
244257
Settings settings = settings();
245258
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
@@ -254,6 +267,7 @@ public void testAll7X() throws Exception {
254267
@Test
255268
public void testIdPattern() throws Exception {
256269
assumeFalse(isDeleteOP() && jsonInput);
270+
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
257271
Settings settings = settings();
258272
if (version.onOrAfter(EsMajorVersion.V_8_X)) {
259273
settings.setResourceWrite("{n}");
@@ -468,6 +482,127 @@ public void testUpdateOnlyParamInlineScript6X() throws Exception {
468482
assertEquals(result, ba.toString());
469483
}
470484

485+
@Test
486+
public void testUpsertParamInlineScript1X() throws Exception {
487+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
488+
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
489+
Settings set = settings();
490+
491+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
492+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
493+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
494+
495+
create(set).write(data).copyTo(ba);
496+
497+
String result =
498+
"{\"update\":{\"_id\":3}}\n" +
499+
"{\"params\":{\"param1\":1,\"param2\":1},\"lang\":\"groovy\",\"script\":\"counter = param1; anothercounter = param2\",\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";
500+
501+
assertEquals(result, ba.toString());
502+
}
503+
504+
@Test
505+
public void testUpsertParamInlineScript5X() throws Exception {
506+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
507+
assumeTrue(version.after(EsMajorVersion.V_1_X));
508+
assumeTrue(version.before(EsMajorVersion.V_6_X));
509+
Settings set = settings();
510+
511+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
512+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
513+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
514+
515+
create(set).write(data).copyTo(ba);
516+
517+
String result =
518+
"{\"update\":{\"_id\":3}}\n" +
519+
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";
520+
521+
assertEquals(result, ba.toString());
522+
}
523+
524+
@Test
525+
public void testUpsertParamInlineScript6X() throws Exception {
526+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
527+
assumeTrue(version.onOrAfter(EsMajorVersion.V_6_X));
528+
Settings set = settings();
529+
530+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
531+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
532+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
533+
534+
create(set).write(data).copyTo(ba);
535+
536+
String result =
537+
"{\"update\":{\"_id\":3}}\n" +
538+
"{\"script\":{\"source\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";
539+
540+
assertEquals(result, ba.toString());
541+
}
542+
543+
@Test
544+
public void testScriptedUpsertParamInlineScript1X() throws Exception {
545+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
546+
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
547+
Settings set = settings();
548+
549+
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
550+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
551+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
552+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
553+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
554+
555+
create(set).write(data).copyTo(ba);
556+
557+
String result =
558+
"{\"update\":{\"_id\":1}}\n" +
559+
"{\"params\":{\"param1\":1,\"param2\":1},\"lang\":\"groovy\",\"script\":\"counter = param1; anothercounter = param2\",\"scripted_upsert\": true,\"upsert\":{}}\n";
560+
561+
assertEquals(result, ba.toString());
562+
}
563+
564+
@Test
565+
public void testScriptedUpsertParamInlineScript5X() throws Exception {
566+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
567+
assumeTrue(version.after(EsMajorVersion.V_1_X));
568+
assumeTrue(version.before(EsMajorVersion.V_6_X));
569+
Settings set = settings();
570+
571+
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
572+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
573+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
574+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
575+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
576+
577+
create(set).write(data).copyTo(ba);
578+
579+
String result =
580+
"{\"update\":{\"_id\":1}}\n" +
581+
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"scripted_upsert\": true,\"upsert\":{}}\n";
582+
583+
assertEquals(result, ba.toString());
584+
}
585+
@Test
586+
public void testScriptedUpsertParamInlineScript6X() throws Exception {
587+
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
588+
assumeTrue(version.onOrAfter(EsMajorVersion.V_6_X));
589+
Settings set = settings();
590+
591+
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
592+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
593+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
594+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
595+
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");
596+
597+
create(set).write(data).copyTo(ba);
598+
599+
String result =
600+
"{\"update\":{\"_id\":1}}\n" +
601+
"{\"script\":{\"source\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"scripted_upsert\": true,\"upsert\":{}}\n";
602+
603+
assertEquals(result, ba.toString());
604+
}
605+
471606
@Test
472607
public void testUpdateOnlyParamFileScript1X() throws Exception {
473608
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
@@ -533,6 +668,9 @@ private Settings settings() {
533668
if (isUpdateOp()) {
534669
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<2>");
535670
}
671+
if (isUpsertOp()) {
672+
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<3>");
673+
}
536674
return set;
537675
}
538676

@@ -565,6 +703,10 @@ private boolean isUpdateOp() {
565703
return ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation);
566704
}
567705

706+
private boolean isUpsertOp() {
707+
return ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation);
708+
}
709+
568710
private boolean isDeleteOP() {
569711
return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation);
570712
}

0 commit comments

Comments
 (0)