Skip to content

Commit d1a723b

Browse files
authored
Handling a race condition that can happen during index creation (#2216)
1 parent e223b4f commit d1a723b

File tree

1 file changed

+29
-20
lines changed

1 file changed

+29
-20
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -685,34 +685,43 @@ private static RestRepository initSingleIndex(Settings settings, long currentIns
685685
// no routing necessary; select the relevant target shard/node
686686
Map<ShardInfo, NodeInfo> targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
687687
repository.close();
688+
String nodeAddress;
689+
if (targetShards.isEmpty()) {
690+
List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
691+
nodeAddress = nodes.get(new Random().nextInt(nodes.size()));
692+
if (log.isDebugEnabled()) {
693+
log.debug(String.format("Shards not found for partition writer instance [%s], so node [%s] has been randomly selected",
694+
currentInstance, nodeAddress));
695+
}
696+
} else {
697+
List<ShardInfo> orderedShards = new ArrayList<ShardInfo>(targetShards.keySet());
698+
// make sure the order is strict
699+
Collections.sort(orderedShards);
700+
if (log.isTraceEnabled()) {
701+
log.trace(String.format("Partition writer instance [%s] discovered [%s] primary shards %s", currentInstance, orderedShards.size(), orderedShards));
702+
}
688703

689-
Assert.isTrue(!targetShards.isEmpty(),
690-
String.format("Cannot determine write shards for [%s]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)", resource));
691-
692-
693-
List<ShardInfo> orderedShards = new ArrayList<ShardInfo>(targetShards.keySet());
694-
// make sure the order is strict
695-
Collections.sort(orderedShards);
696-
if (log.isTraceEnabled()) {
697-
log.trace(String.format("Partition writer instance [%s] discovered [%s] primary shards %s", currentInstance, orderedShards.size(), orderedShards));
698-
}
699-
700-
// if there's no task info, just pick a random bucket
701-
if (currentInstance <= 0) {
702-
currentInstance = new Random().nextInt(targetShards.size()) + 1;
704+
// if there's no task info, just pick a random bucket
705+
if (currentInstance <= 0) {
706+
currentInstance = new Random().nextInt(targetShards.size()) + 1;
707+
}
708+
int bucket = (int) (currentInstance % targetShards.size());
709+
ShardInfo chosenShard = orderedShards.get(bucket);
710+
NodeInfo targetNode = targetShards.get(chosenShard);
711+
nodeAddress = targetNode.getPublishAddress();
712+
if (log.isDebugEnabled()) {
713+
log.debug(String.format("Partition writer instance [%s] assigned using primary shard [%s] to choose node [%s]",
714+
currentInstance, chosenShard.getName(), nodeAddress));
715+
}
703716
}
704-
int bucket = (int)(currentInstance % targetShards.size());
705-
ShardInfo chosenShard = orderedShards.get(bucket);
706-
NodeInfo targetNode = targetShards.get(chosenShard);
707717

708718
// pin settings
709-
SettingsUtils.pinNode(settings, targetNode.getPublishAddress());
719+
SettingsUtils.pinNode(settings, nodeAddress);
710720
String node = SettingsUtils.getPinnedNode(settings);
711721
repository = new RestRepository(settings);
712722

713723
if (log.isDebugEnabled()) {
714-
log.debug(String.format("Partition writer instance [%s] assigned to primary shard [%s] at address [%s]",
715-
currentInstance, chosenShard.getName(), node));
724+
log.debug(String.format("Partition writer instance [%s] assigned to address [%s]", currentInstance, node));
716725
}
717726

718727
return repository;

0 commit comments

Comments
 (0)