Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/139781.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 139781
summary: Basic CPS Support for ML Datafeeds
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,23 @@ private PutDatafeedAction() {
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) {
return parseRequest(datafeedId, indicesOptions, false, parser);
}

public static Request parseRequest(
String datafeedId,
IndicesOptions indicesOptions,
boolean enableCrossProjectSearch,
XContentParser parser
) {
DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null);
if (datafeed.getIndicesOptions() == null) {
datafeed.setIndicesOptions(indicesOptions);
}
// If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options
datafeed.setIndicesOptions(
DatafeedConfig.ensureCrossProjectSearchEnabled(datafeed.getIndicesOptions(), enableCrossProjectSearch)
);
datafeed.setId(datafeedId);
return new Request(datafeed.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;

import java.io.IOException;
Expand All @@ -32,10 +33,21 @@ private UpdateDatafeedAction() {
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) {
return parseRequest(datafeedId, indicesOptions, false, parser);
}

public static Request parseRequest(
String datafeedId,
@Nullable IndicesOptions indicesOptions,
boolean enableCrossProjectSearch,
XContentParser parser
) {
DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null);
if (indicesOptions != null) {
update.setIndicesOptions(indicesOptions);
}
// If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options
update.setIndicesOptions(DatafeedConfig.ensureCrossProjectSearchEnabled(update.getIndicesOptions(), enableCrossProjectSearch));
update.setId(datafeedId);
return new Request(update.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -22,7 +21,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
Expand Down Expand Up @@ -127,6 +125,8 @@ public class DatafeedConfig implements SimpleDiffable<DatafeedConfig>, ToXConten
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
public static final ParseField INDICES_OPTIONS = new ParseField("indices_options");
// Field for cross-project search option within indices_options (not part of core IndicesOptions.toXContent)
private static final String RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION = "resolve_cross_project_index_expression";

// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
Expand Down Expand Up @@ -204,15 +204,37 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
DELAYED_DATA_CHECK_CONFIG
);
parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
parser.declareObject(
Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
INDICES_OPTIONS
);
parser.declareObject(Builder::setIndicesOptions, (p, c) -> {
Map<String, Object> map = p.map();
IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS);
// Handle cross-project option which is not part of core IndicesOptions.fromMap/toXContent
Object crossProjectValue = map.get(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION);
if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) {
return IndicesOptions.builder(baseOptions)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
}
return baseOptions;
}, INDICES_OPTIONS);
parser.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
return parser;
}

/**
* Ensures the cross-project search flag is enabled on the given IndicesOptions if not already set.
* Returns the options unchanged if they are null or already have CPS enabled.
*
* @param options the IndicesOptions to potentially update
* @param enableCrossProjectSearch whether CPS should be enabled
* @return the updated IndicesOptions with CPS enabled, or the original options if no change is needed
*/
public static IndicesOptions ensureCrossProjectSearchEnabled(IndicesOptions options, boolean enableCrossProjectSearch) {
if (enableCrossProjectSearch && options != null && options.resolveCrossProjectIndexExpression() == false) {
return IndicesOptions.builder(options).crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)).build();
}
return options;
}

private final String id;
private final String jobId;

Expand Down Expand Up @@ -563,15 +585,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
// Cross-project option is not part of core IndicesOptions.toXContent, serialize it explicitly
if (indicesOptions.resolveCrossProjectIndexExpression()) {
builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true);
}
builder.endObject();
} else { // Don't include random defaults or unnecessary defaults in export
if (queryDelay.equals(defaultRandomQueryDelay(jobId)) == false) {
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
// Indices options are a pretty advanced feature, better to not include them if they are just the default ones
if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false) {
// Also serialize if CPS is enabled (even if other options are default)
if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false
|| indicesOptions.resolveCrossProjectIndexExpression()) {
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
if (indicesOptions.resolveCrossProjectIndexExpression()) {
builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true);
}
builder.endObject();
}
// Removing the default chunking config as it is determined by OTHER fields
Expand Down Expand Up @@ -665,6 +696,7 @@ public boolean equals(Object other) {
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions)
&& Objects.equals(this.runtimeMappings, that.runtimeMappings);
// CPS: Add new cross-project search fields here (e.g., projectRouting)
}

@Override
Expand All @@ -685,6 +717,7 @@ public int hashCode() {
maxEmptySearches,
indicesOptions,
runtimeMappings
// CPS: Add new cross-project search fields here (e.g., projectRouting)
);
}

Expand Down Expand Up @@ -1052,9 +1085,6 @@ public DatafeedConfig build() {
if (indicesOptions == null) {
indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED;
}
if (indicesOptions.resolveCrossProjectIndexExpression()) {
throw new ElasticsearchStatusException("Cross-project search is not enabled for Datafeeds", RestStatus.FORBIDDEN);
}

return new DatafeedConfig(
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,17 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG
);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
PARSER.declareObject(
Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
DatafeedConfig.INDICES_OPTIONS
);
PARSER.declareObject(Builder::setIndicesOptions, (p, c) -> {
Map<String, Object> map = p.map();
IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS);
Object crossProjectValue = map.get("resolve_cross_project_index_expression");
if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) {
return IndicesOptions.builder(baseOptions)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
}
return baseOptions;
}, DatafeedConfig.INDICES_OPTIONS);
PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
}

Expand Down Expand Up @@ -248,6 +254,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (indicesOptions != null) {
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
if (indicesOptions.resolveCrossProjectIndexExpression()) {
builder.field("resolve_cross_project_index_expression", true);
}
builder.endObject();
}
addOptionalField(builder, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD, runtimeMappings);
Expand Down Expand Up @@ -597,6 +606,10 @@ public Builder setIndicesOptions(IndicesOptions indicesOptions) {
return this;
}

public IndicesOptions getIndicesOptions() {
return indicesOptions;
}

public Builder setRuntimeMappings(Map<String, Object> runtimeMappings) {
this.runtimeMappings = runtimeMappings;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
*/
package org.elasticsearch.xpack.core.ml.datafeed;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -150,17 +148,17 @@ protected Writeable.Reader<DatafeedConfig.Builder> instanceReader() {
return DatafeedConfig.Builder::new;
}

public void testResolveCrossProjectIsDisabled() {
public void testCrossProjectModeOptionsAllowed() {
var datafeedBuilder = createRandomizedDatafeedConfigBuilder("jobId", "datafeed-id", 3600000);
datafeedBuilder = datafeedBuilder.setIndicesOptions(
IndicesOptions.builder(datafeedBuilder.getIndicesOptions())
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build()
);

var actualException = assertThrows(ElasticsearchStatusException.class, datafeedBuilder::build);
assertThat(actualException.getMessage(), equalTo("Cross-project search is not enabled for Datafeeds"));
assertThat(actualException.status(), equalTo(RestStatus.FORBIDDEN));
// CPS mode is now allowed for datafeeds - should not throw
DatafeedConfig config = datafeedBuilder.build();
assertThat(config.getIndicesOptions().resolveCrossProjectIndexExpression(), equalTo(true));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,47 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) {
return builder.build();
}

public void testCrossProjectIndicesOptionsAllowed() {
IndicesOptions cpsOptions = IndicesOptions.builder()
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();

DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df", "test-job").setIndices(List.of("logs-*"))
.setIndicesOptions(cpsOptions);

DatafeedConfig config = builder.build(); // Should not throw
assertTrue(config.getIndicesOptions().resolveCrossProjectIndexExpression());
}

public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException {
// Create a datafeed with CPS enabled, using SearchRequest.DEFAULT_INDICES_OPTIONS as base
// to match what fromMap() will use as defaults for unparsed fields
IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();

DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df-cps", "test-job").setIndices(List.of("logs-*"))
.setIndicesOptions(cpsOptions);

DatafeedConfig original = builder.build();
assertTrue("Original config should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression());

// Serialize to XContent
BytesReference bytes = XContentHelper.toXContent(original, XContentType.JSON, false);

// Parse back from XContent
DatafeedConfig parsed;
try (XContentParser xContentParser = parser(bytes)) {
parsed = DatafeedConfig.STRICT_PARSER.apply(xContentParser, null).build();
}

// Verify CPS flag survives round-trip (the main goal of this test)
assertTrue(
"CPS flag should survive XContent round-trip serialization",
parsed.getIndicesOptions().resolveCrossProjectIndexExpression()
);
}

private XContentParser parser(String json) throws IOException {
return JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry()), json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,37 @@ public void testApply_givenIndicesOptions() {
assertThat(updatedDatafeed.getIndicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN));
}

public void testCrossProjectIndicesOptionsAllowed() {
IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();

DatafeedUpdate update = new DatafeedUpdate.Builder("test-df").setIndicesOptions(cpsOptions).build();

assertThat("Update should have CPS enabled", update.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true));
}

public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException {
IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();

DatafeedUpdate original = new DatafeedUpdate.Builder("test-df-cps").setIndicesOptions(cpsOptions).build();

assertThat("Original should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true));

// Serialize to XContent and parse back
try (XContentParser parser = createParser(XContentType.JSON.xContent(), org.elasticsearch.common.Strings.toString(original))) {
DatafeedUpdate parsed = DatafeedUpdate.PARSER.apply(parser, null).build();

assertThat(
"CPS flag should survive XContent round-trip",
parsed.getIndicesOptions().resolveCrossProjectIndexExpression(),
is(true)
);
}
}

public void testApply_GivenRandomUpdates_AssertImmutability() {
for (int i = 0; i < 100; ++i) {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId());
Expand Down
Loading