diff --git a/docs/changelog/139781.yaml b/docs/changelog/139781.yaml new file mode 100644 index 0000000000000..f662f36967644 --- /dev/null +++ b/docs/changelog/139781.yaml @@ -0,0 +1,5 @@ +pr: 139781 +summary: Basic CPS Support for ML Datafeeds +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java index 33ed0a8ce451f..e4ff9f2f9c358 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java @@ -32,10 +32,23 @@ private PutDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) { + return parseRequest(datafeedId, indicesOptions, false, parser); + } + + public static Request parseRequest( + String datafeedId, + IndicesOptions indicesOptions, + boolean enableCrossProjectSearch, + XContentParser parser + ) { DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null); if (datafeed.getIndicesOptions() == null) { datafeed.setIndicesOptions(indicesOptions); } + // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options + datafeed.setIndicesOptions( + DatafeedConfig.ensureCrossProjectSearchEnabled(datafeed.getIndicesOptions(), enableCrossProjectSearch) + ); datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 0757f1f1dc7e7..17ccf5fd91400 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import java.io.IOException; @@ -32,10 +33,21 @@ private UpdateDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) { + return parseRequest(datafeedId, indicesOptions, false, parser); + } + + public static Request parseRequest( + String datafeedId, + @Nullable IndicesOptions indicesOptions, + boolean enableCrossProjectSearch, + XContentParser parser + ) { DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); if (indicesOptions != null) { update.setIndicesOptions(indicesOptions); } + // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options + update.setIndicesOptions(DatafeedConfig.ensureCrossProjectSearchEnabled(update.getIndicesOptions(), enableCrossProjectSearch)); update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index b1ed7f61ef576..388402dae8e21 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -22,7 +21,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; @@ -127,6 +125,8 @@ public class DatafeedConfig implements SimpleDiffable, ToXConten public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); + // Field for cross-project search option within indices_options (not part of core IndicesOptions.toXContent) + private static final String RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION = "resolve_cross_project_index_expression"; // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -204,15 +204,37 @@ private static ObjectParser createParser(boolean ignoreUnknownFie DELAYED_DATA_CHECK_CONFIG ); parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); - parser.declareObject( - Builder::setIndicesOptions, - (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), - INDICES_OPTIONS - ); + parser.declareObject(Builder::setIndicesOptions, (p, c) -> { + Map map = p.map(); + IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS); + // Handle cross-project option which is not part of core IndicesOptions.fromMap/toXContent + Object crossProjectValue = map.get(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION); + if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) { + return IndicesOptions.builder(baseOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + } + return baseOptions; + }, INDICES_OPTIONS); parser.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); return parser; } + /** + * Ensures the cross-project search flag is enabled on the given IndicesOptions if not already set. + * Returns the options unchanged if they are null or already have CPS enabled. + * + * @param options the IndicesOptions to potentially update + * @param enableCrossProjectSearch whether CPS should be enabled + * @return the updated IndicesOptions with CPS enabled, or the original options if no change is needed + */ + public static IndicesOptions ensureCrossProjectSearchEnabled(IndicesOptions options, boolean enableCrossProjectSearch) { + if (enableCrossProjectSearch && options != null && options.resolveCrossProjectIndexExpression() == false) { + return IndicesOptions.builder(options).crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)).build(); + } + return options; + } + private final String id; private final String jobId; @@ -563,15 +585,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.startObject(INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + // Cross-project option is not part of core IndicesOptions.toXContent, serialize it explicitly + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true); + } builder.endObject(); } else { // Don't include random defaults or unnecessary defaults in export if (queryDelay.equals(defaultRandomQueryDelay(jobId)) == false) { builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep()); } // Indices options are a pretty advanced feature, better to not include them if they are just the default ones - if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false) { + // Also serialize if CPS is enabled (even if other options are default) + if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false + || indicesOptions.resolveCrossProjectIndexExpression()) { builder.startObject(INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true); + } builder.endObject(); } // Removing the default chunking config as it is determined by OTHER fields @@ -665,6 +696,7 @@ public boolean equals(Object other) { && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) && Objects.equals(this.indicesOptions, that.indicesOptions) && Objects.equals(this.runtimeMappings, that.runtimeMappings); + // CPS: Add new cross-project search fields here (e.g., projectRouting) } @Override @@ -685,6 +717,7 @@ public int hashCode() { maxEmptySearches, indicesOptions, runtimeMappings + // CPS: Add new cross-project search fields here (e.g., projectRouting) ); } @@ -1052,9 +1085,6 @@ public DatafeedConfig build() { if (indicesOptions == null) { indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED; } - if (indicesOptions.resolveCrossProjectIndexExpression()) { - throw new ElasticsearchStatusException("Cross-project search is not enabled for Datafeeds", RestStatus.FORBIDDEN); - } return new DatafeedConfig( id, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 35eb672fe69cb..977625cc3b6bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -87,11 +87,17 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { DatafeedConfig.DELAYED_DATA_CHECK_CONFIG ); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); - PARSER.declareObject( - Builder::setIndicesOptions, - (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), - DatafeedConfig.INDICES_OPTIONS - ); + PARSER.declareObject(Builder::setIndicesOptions, (p, c) -> { + Map map = p.map(); + IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS); + Object crossProjectValue = map.get("resolve_cross_project_index_expression"); + if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) { + return IndicesOptions.builder(baseOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + } + return baseOptions; + }, DatafeedConfig.INDICES_OPTIONS); PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); } @@ -248,6 +254,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (indicesOptions != null) { builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field("resolve_cross_project_index_expression", true); + } builder.endObject(); } addOptionalField(builder, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD, runtimeMappings); @@ -597,6 +606,10 @@ public Builder setIndicesOptions(IndicesOptions indicesOptions) { return this; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + public Builder setRuntimeMappings(Map runtimeMappings) { this.runtimeMappings = runtimeMappings; return this; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java index 800406adc2868..ffdcc462b5b0b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java @@ -6,14 +6,12 @@ */ package org.elasticsearch.xpack.core.ml.datafeed; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -150,7 +148,7 @@ protected Writeable.Reader instanceReader() { return DatafeedConfig.Builder::new; } - public void testResolveCrossProjectIsDisabled() { + public void testCrossProjectModeOptionsAllowed() { var datafeedBuilder = createRandomizedDatafeedConfigBuilder("jobId", "datafeed-id", 3600000); datafeedBuilder = datafeedBuilder.setIndicesOptions( IndicesOptions.builder(datafeedBuilder.getIndicesOptions()) @@ -158,9 +156,9 @@ public void testResolveCrossProjectIsDisabled() { .build() ); - var actualException = assertThrows(ElasticsearchStatusException.class, datafeedBuilder::build); - assertThat(actualException.getMessage(), equalTo("Cross-project search is not enabled for Datafeeds")); - assertThat(actualException.status(), equalTo(RestStatus.FORBIDDEN)); + // CPS mode is now allowed for datafeeds - should not throw + DatafeedConfig config = datafeedBuilder.build(); + assertThat(config.getIndicesOptions().resolveCrossProjectIndexExpression(), equalTo(true)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index aaf16def9b65a..3af33dd703af2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -1061,6 +1061,47 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) { return builder.build(); } + public void testCrossProjectIndicesOptionsAllowed() { + IndicesOptions cpsOptions = IndicesOptions.builder() + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df", "test-job").setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig config = builder.build(); // Should not throw + assertTrue(config.getIndicesOptions().resolveCrossProjectIndexExpression()); + } + + public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException { + // Create a datafeed with CPS enabled, using SearchRequest.DEFAULT_INDICES_OPTIONS as base + // to match what fromMap() will use as defaults for unparsed fields + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df-cps", "test-job").setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig original = builder.build(); + assertTrue("Original config should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression()); + + // Serialize to XContent + BytesReference bytes = XContentHelper.toXContent(original, XContentType.JSON, false); + + // Parse back from XContent + DatafeedConfig parsed; + try (XContentParser xContentParser = parser(bytes)) { + parsed = DatafeedConfig.STRICT_PARSER.apply(xContentParser, null).build(); + } + + // Verify CPS flag survives round-trip (the main goal of this test) + assertTrue( + "CPS flag should survive XContent round-trip serialization", + parsed.getIndicesOptions().resolveCrossProjectIndexExpression() + ); + } + private XContentParser parser(String json) throws IOException { return JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry()), json); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index 3b1c4e0248e2c..59098f51fcd55 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -330,6 +330,37 @@ public void testApply_givenIndicesOptions() { assertThat(updatedDatafeed.getIndicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN)); } + public void testCrossProjectIndicesOptionsAllowed() { + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedUpdate update = new DatafeedUpdate.Builder("test-df").setIndicesOptions(cpsOptions).build(); + + assertThat("Update should have CPS enabled", update.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true)); + } + + public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException { + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedUpdate original = new DatafeedUpdate.Builder("test-df-cps").setIndicesOptions(cpsOptions).build(); + + assertThat("Original should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true)); + + // Serialize to XContent and parse back + try (XContentParser parser = createParser(XContentType.JSON.xContent(), org.elasticsearch.common.Strings.toString(original))) { + DatafeedUpdate parsed = DatafeedUpdate.PARSER.apply(parser, null).build(); + + assertThat( + "CPS flag should survive XContent round-trip", + parsed.getIndicesOptions().resolveCrossProjectIndexExpression(), + is(true) + ); + } + } + public void testApply_GivenRandomUpdates_AssertImmutability() { for (int i = 0; i < 100; ++i) { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId()); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java new file mode 100644 index 0000000000000..3adec33252653 --- /dev/null +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java @@ -0,0 +1,292 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; +import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for Cross-Project Search (CPS) support in ML datafeeds. + * + * CPS allows datafeeds to search across multiple projects using the same index pattern + * syntax as Cross-Cluster Search (CCS). Since CPS is built on CCS infrastructure, + * datafeeds already handle CPS patterns correctly through existing remote index handling. + * + * Key validation: RemoteClusterAware.isRemoteIndexName() returns true for both CCS + * (cluster:index) and CPS (project:index) patterns, so existing remote index handling + * in DataExtractorFactory, DatafeedManager, and DatafeedNodeSelector automatically + * works for CPS. + */ +public class DatafeedCpsIT extends MlSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build(); + } + + @Override + protected Collection> getPlugins() { + return Stream.concat(super.getPlugins().stream(), Stream.of(CpsPlugin.class)).toList(); + } + + /** + * Test that a datafeed with CPS-enabled IndicesOptions can be created successfully. + * This validates that the CPS block has been removed from DatafeedConfig.Builder. + */ + public void testCrossProjectIndicesOptionsAllowedForDatafeed() throws Exception { + String jobId = "cps-test-job"; + String datafeedId = "cps-test-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS-enabled IndicesOptions + // This tests that the CPS block is removed - previously this would throw + // ElasticsearchStatusException("Cross-project search is not enabled for Datafeeds") + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully (no exception thrown) + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + + // Verify datafeed can be retrieved + GetDatafeedsAction.Response getResponse = client().execute(GetDatafeedsAction.INSTANCE, new GetDatafeedsAction.Request(datafeedId)) + .actionGet(); + DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); + assertThat(storedConfig.getId(), equalTo(datafeedId)); + assertThat(storedConfig.getIndices(), equalTo(List.of("logs-*"))); + } + + /** + * Test that the CPS flag survives cluster state persistence (GET after PUT). + * This validates that DatafeedConfig correctly serializes and deserializes + * the resolve_cross_project_index_expression field via XContent. + */ + public void testCpsFlagSurvivesClusterStatePersistence() throws Exception { + String jobId = "cps-persist-job"; + String datafeedId = "cps-persist-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS enabled + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig originalConfig = datafeedBuilder.build(); + assertTrue("Original config should have CPS enabled", originalConfig.getIndicesOptions().resolveCrossProjectIndexExpression()); + + // Store the datafeed (goes through XContent serialization to cluster state) + PutDatafeedAction.Request request = new PutDatafeedAction.Request(originalConfig); + client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Retrieve the datafeed (comes back through XContent deserialization from cluster state) + GetDatafeedsAction.Response getResponse = client().execute(GetDatafeedsAction.INSTANCE, new GetDatafeedsAction.Request(datafeedId)) + .actionGet(); + DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); + + // Verify CPS flag survives the round-trip through cluster state + assertTrue( + "CPS flag should survive cluster state persistence", + storedConfig.getIndicesOptions().resolveCrossProjectIndexExpression() + ); + } + + /** + * Test that RemoteClusterAware.isRemoteIndexName() correctly identifies CPS qualified indices. + * This validates that the existing CCS infrastructure handles CPS patterns, which means + * existing remote index handling (rollup skip, HasPrivileges filter, index verification skip) + * will work correctly for CPS indices. + */ + public void testRemoteClusterAwareIdentifiesCpsPattern() { + // CCS remote pattern + assertThat(RemoteClusterAware.isRemoteIndexName("cluster1:logs-*"), is(true)); + + // CPS qualified pattern (same syntax as CCS) + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs-*"), is(true)); + + // Local/unqualified pattern + assertThat(RemoteClusterAware.isRemoteIndexName("logs-*"), is(false)); + + // Multiple colons - first colon separates remote/project from index + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs:v1-*"), is(true)); + + // Wildcard in project/cluster part + assertThat(RemoteClusterAware.isRemoteIndexName("prod-*:logs-*"), is(true)); + } + + /** + * Test that a datafeed with CPS qualified indices can be created. + * This validates that CPS qualified indices are handled like CCS remote indices. + */ + public void testDatafeedWithCpsQualifiedIndices() throws Exception { + String jobId = "cps-qualified-job"; + String datafeedId = "cps-qualified-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS qualified indices (project:index pattern) + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices( + List.of("project1:logs-*", "project2:metrics-*") + ).setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully with CPS qualified indices + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + assertThat(response.getResponse().getIndices(), equalTo(List.of("project1:logs-*", "project2:metrics-*"))); + } + + /** + * Test that a datafeed with mixed local and CPS qualified indices can be created. + */ + public void testDatafeedWithMixedLocalAndCpsIndices() throws Exception { + String jobId = "mixed-indices-job"; + String datafeedId = "mixed-indices-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with mixed local and CPS qualified indices + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices( + List.of("local-logs-*", "project1:remote-logs-*") + ).setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + } + + /** + * Test CCS/CPS parity: both patterns should be identified as remote by RemoteClusterAware. + * This ensures that existing CCS handling in datafeeds automatically applies to CPS. + */ + public void testCcsAndCpsPatternParity() { + // Verify both CCS and CPS patterns are treated identically by RemoteClusterAware + String ccsPattern = "cluster1:logs-*"; + String cpsPattern = "project1:logs-*"; + + boolean ccsIsRemote = RemoteClusterAware.isRemoteIndexName(ccsPattern); + boolean cpsIsRemote = RemoteClusterAware.isRemoteIndexName(cpsPattern); + + assertThat("CCS pattern should be identified as remote", ccsIsRemote, is(true)); + assertThat("CPS pattern should be identified as remote", cpsIsRemote, is(true)); + assertThat("CCS and CPS patterns should be handled identically", ccsIsRemote, equalTo(cpsIsRemote)); + } + + /** + * Test that documents the security model for CPS indices in datafeeds. + * + * Security implications for CPS indices (project:index patterns): + * - Up-front privilege validation (HasPrivileges check) is skipped for CPS indices + * (same as CCS remote indices) - see DatafeedManager.putDatafeed() + * - This is by design: RemoteClusterLicenseChecker.isRemoteIndex() returns true for + * both CCS (cluster:index) and CPS (project:index) patterns + * - For CPS, privilege enforcement happens at search execution time via UIAM + * (Universal Identity and Access Management), not at datafeed creation time + * - This mirrors the existing CCS behavior where remote cluster privileges cannot + * be validated locally at datafeed creation time + * + * This test validates the filtering behavior that enables this security model. + */ + public void testCpsIndicesSkipUpfrontPrivilegeValidation() { + // Both CCS and CPS patterns are identified as "remote" by the license checker + // This causes them to be filtered out of the HasPrivileges check in DatafeedManager + + // CCS pattern - skipped from privilege check (can't validate remote cluster privileges) + assertThat(RemoteClusterAware.isRemoteIndexName("cluster1:logs-*"), is(true)); + + // CPS pattern - also skipped (privileges enforced at search time via UIAM) + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs-*"), is(true)); + + // Local pattern - NOT skipped (privileges validated at datafeed creation) + assertThat(RemoteClusterAware.isRemoteIndexName("logs-*"), is(false)); + + // Mixed indices: local indices get privilege checked, CPS indices rely on UIAM + List mixedIndices = List.of("local-logs-*", "project1:remote-logs-*"); + long localCount = mixedIndices.stream().filter(i -> RemoteClusterAware.isRemoteIndexName(i) == false).count(); + long remoteCount = mixedIndices.stream().filter(RemoteClusterAware::isRemoteIndexName).count(); + assertThat("Should have 1 local index for privilege validation", localCount, equalTo(1L)); + assertThat("Should have 1 CPS index skipped from privilege validation", remoteCount, equalTo(1L)); + } + + /** + * Helper method to create a scheduled job for testing. + */ + private static Job.Builder createScheduledJob(String jobId) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); + + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + /** + * Plugin to register the serverless.cross_project.enabled setting. + */ + public static class CpsPlugin extends Plugin implements ClusterPlugin { + public List> getSettings() { + return List.of(Setting.simpleString("serverless.cross_project.enabled", Setting.Property.NodeScope)); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 6db4bfde5e3c5..670500a554439 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1508,8 +1508,8 @@ public List getRestHandlers( restHandlers.add(new RestUpdateModelSnapshotAction()); restHandlers.add(new RestGetDatafeedsAction()); restHandlers.add(new RestGetDatafeedStatsAction()); - restHandlers.add(new RestPutDatafeedAction()); - restHandlers.add(new RestUpdateDatafeedAction()); + restHandlers.add(new RestPutDatafeedAction(settings)); + restHandlers.add(new RestUpdateDatafeedAction(settings)); restHandlers.add(new RestDeleteDatafeedAction()); restHandlers.add(new RestPreviewDatafeedAction()); restHandlers.add(new RestStartDatafeedAction()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 7321f991773e1..84560ea0a0284 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -106,6 +106,10 @@ public void putDatafeed( if (XPackSettings.SECURITY_ENABLED.get(settings)) { useSecondaryAuthIfAvailable(securityContext, () -> { // TODO: Remove this filter once https://github.com/elastic/elasticsearch/issues/67798 is fixed. + // Note: This filter excludes both CCS (cluster:index) and CPS (project:index) patterns from + // up-front privilege validation. For CCS, privileges cannot be checked locally. For CPS, + // privilege enforcement happens at search execution time via UIAM. This is by design - + // RemoteClusterLicenseChecker.isRemoteIndex() returns true for both patterns. final String[] indices = request.getDatafeed() .getIndices() .stream() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java index 360b14277a19a..30edcc592de91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -30,6 +32,12 @@ @ServerlessScope(Scope.PUBLIC) public class RestPutDatafeedAction extends BaseRestHandler { + private final CrossProjectModeDecider crossProjectModeDecider; + + public RestPutDatafeedAction(Settings settings) { + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); + } + @Override public List routes() { return List.of(new Route(PUT, BASE_PATH + "datafeeds/{" + ID + "}")); @@ -44,9 +52,10 @@ public String getName() { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); + boolean enableCps = crossProjectModeDecider.crossProjectEnabled(); PutDatafeedAction.Request putDatafeedRequest; try (XContentParser parser = restRequest.contentParser()) { - putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); + putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, enableCps, parser); } putDatafeedRequest.ackTimeout(getAckTimeout(restRequest)); putDatafeedRequest.masterNodeTimeout(getMasterNodeTimeout(restRequest)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java index 5335de78aa18a..040ad3f9d7813 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -30,6 +32,12 @@ @ServerlessScope(Scope.PUBLIC) public class RestUpdateDatafeedAction extends BaseRestHandler { + private final CrossProjectModeDecider crossProjectModeDecider; + + public RestUpdateDatafeedAction(Settings settings) { + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); + } + @Override public List routes() { return List.of(new Route(POST, BASE_PATH + "datafeeds/{" + ID + "}/_update")); @@ -50,9 +58,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient || restRequest.hasParam("ignore_throttled")) { indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); } + boolean enableCps = crossProjectModeDecider.crossProjectEnabled(); UpdateDatafeedAction.Request updateDatafeedRequest; try (XContentParser parser = restRequest.contentParser()) { - updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); + updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, enableCps, parser); } updateDatafeedRequest.ackTimeout(getAckTimeout(restRequest)); updateDatafeedRequest.masterNodeTimeout(getMasterNodeTimeout(restRequest));