From 0d753078bb6f32b07b7b6d0957249f4121fd6ee9 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 18 Dec 2025 21:35:41 +0100 Subject: [PATCH 1/5] [ML] Enhance Datafeed Configuration for Cross-Project Search This commit introduces support for Cross-Project Search (CPS) in the Datafeed configuration. Key changes include: - Updated `PutDatafeedAction` and `UpdateDatafeedAction` to accept a new parameter for enabling CPS during request parsing. - Enhanced `DatafeedConfig` to handle CPS options, including serialization and deserialization of the `resolve_cross_project_index_expression` field. - Added integration tests to validate CPS functionality in datafeeds, ensuring that CPS-enabled indices can be created and retrieved correctly. - Adjusted existing tests to reflect the new CPS capabilities and removed restrictions that previously prevented CPS usage in datafeeds. These changes facilitate the use of CPS in machine learning datafeeds, allowing for more flexible data retrieval across projects. --- .../core/ml/action/PutDatafeedAction.java | 21 ++ .../core/ml/action/UpdateDatafeedAction.java | 21 ++ .../core/ml/datafeed/DatafeedConfig.java | 35 ++- .../core/ml/datafeed/DatafeedUpdate.java | 4 + .../datafeed/DatafeedConfigBuilderTests.java | 10 +- .../core/ml/datafeed/DatafeedConfigTests.java | 41 +++ .../xpack/ml/integration/DatafeedCpsIT.java | 297 ++++++++++++++++++ .../xpack/ml/MachineLearning.java | 4 +- .../xpack/ml/datafeed/DatafeedManager.java | 4 + .../rest/datafeeds/RestPutDatafeedAction.java | 11 +- .../datafeeds/RestUpdateDatafeedAction.java | 11 +- 11 files changed, 440 insertions(+), 19 deletions(-) create mode 100644 x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java index 33ed0a8ce451f..43224038f9789 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java @@ -32,10 +32,31 @@ private PutDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) { + return parseRequest(datafeedId, indicesOptions, false, parser); + } + + public static Request parseRequest( + String datafeedId, + IndicesOptions indicesOptions, + boolean enableCrossProjectSearch, + XContentParser parser + ) { DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null); if (datafeed.getIndicesOptions() == null) { datafeed.setIndicesOptions(indicesOptions); } + // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options + // regardless of whether it came from request params or body + if (enableCrossProjectSearch && datafeed.getIndicesOptions() != null) { + IndicesOptions currentOptions = datafeed.getIndicesOptions(); + if (currentOptions.resolveCrossProjectIndexExpression() == false) { + datafeed.setIndicesOptions( + IndicesOptions.builder(currentOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build() + ); + } + } datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 0757f1f1dc7e7..694a557cbd111 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -32,10 +32,31 @@ private UpdateDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) { + return parseRequest(datafeedId, indicesOptions, false, parser); + } + + public static Request parseRequest( + String datafeedId, + @Nullable IndicesOptions indicesOptions, + boolean enableCrossProjectSearch, + XContentParser parser + ) { DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); if (indicesOptions != null) { update.setIndicesOptions(indicesOptions); } + // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options + // regardless of whether it came from request params or body + if (enableCrossProjectSearch && update.getIndicesOptions() != null) { + IndicesOptions currentOptions = update.getIndicesOptions(); + if (currentOptions.resolveCrossProjectIndexExpression() == false) { + update.setIndicesOptions( + IndicesOptions.builder(currentOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build() + ); + } + } update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index b1ed7f61ef576..313dd97d80571 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -127,6 +127,8 @@ public class DatafeedConfig implements SimpleDiffable, ToXConten public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); + // Field for cross-project search option within indices_options (not part of core IndicesOptions.toXContent) + private static final String RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION = "resolve_cross_project_index_expression"; // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -204,11 +206,18 @@ private static ObjectParser createParser(boolean ignoreUnknownFie DELAYED_DATA_CHECK_CONFIG ); parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); - parser.declareObject( - Builder::setIndicesOptions, - (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), - INDICES_OPTIONS - ); + parser.declareObject(Builder::setIndicesOptions, (p, c) -> { + Map map = p.map(); + IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS); + // Handle cross-project option which is not part of core IndicesOptions.fromMap/toXContent + Object crossProjectValue = map.get(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION); + if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) { + return IndicesOptions.builder(baseOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + } + return baseOptions; + }, INDICES_OPTIONS); parser.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); return parser; } @@ -563,15 +572,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.startObject(INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + // Cross-project option is not part of core IndicesOptions.toXContent, serialize it explicitly + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true); + } builder.endObject(); } else { // Don't include random defaults or unnecessary defaults in export if (queryDelay.equals(defaultRandomQueryDelay(jobId)) == false) { builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep()); } // Indices options are a pretty advanced feature, better to not include them if they are just the default ones - if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false) { + // Also serialize if CPS is enabled (even if other options are default) + if (indicesOptions.equals(SearchRequest.DEFAULT_INDICES_OPTIONS) == false + || indicesOptions.resolveCrossProjectIndexExpression()) { builder.startObject(INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field(RESOLVE_CROSS_PROJECT_INDEX_EXPRESSION, true); + } builder.endObject(); } // Removing the default chunking config as it is determined by OTHER fields @@ -665,6 +683,7 @@ public boolean equals(Object other) { && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) && Objects.equals(this.indicesOptions, that.indicesOptions) && Objects.equals(this.runtimeMappings, that.runtimeMappings); + // CPS: Add new cross-project search fields here (e.g., projectRouting) } @Override @@ -685,6 +704,7 @@ public int hashCode() { maxEmptySearches, indicesOptions, runtimeMappings + // CPS: Add new cross-project search fields here (e.g., projectRouting) ); } @@ -1052,9 +1072,6 @@ public DatafeedConfig build() { if (indicesOptions == null) { indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED; } - if (indicesOptions.resolveCrossProjectIndexExpression()) { - throw new ElasticsearchStatusException("Cross-project search is not enabled for Datafeeds", RestStatus.FORBIDDEN); - } return new DatafeedConfig( id, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 35eb672fe69cb..d3e5961ae1afb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -597,6 +597,10 @@ public Builder setIndicesOptions(IndicesOptions indicesOptions) { return this; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + public Builder setRuntimeMappings(Map runtimeMappings) { this.runtimeMappings = runtimeMappings; return this; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java index 800406adc2868..ffdcc462b5b0b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigBuilderTests.java @@ -6,14 +6,12 @@ */ package org.elasticsearch.xpack.core.ml.datafeed; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -150,7 +148,7 @@ protected Writeable.Reader instanceReader() { return DatafeedConfig.Builder::new; } - public void testResolveCrossProjectIsDisabled() { + public void testCrossProjectModeOptionsAllowed() { var datafeedBuilder = createRandomizedDatafeedConfigBuilder("jobId", "datafeed-id", 3600000); datafeedBuilder = datafeedBuilder.setIndicesOptions( IndicesOptions.builder(datafeedBuilder.getIndicesOptions()) @@ -158,9 +156,9 @@ public void testResolveCrossProjectIsDisabled() { .build() ); - var actualException = assertThrows(ElasticsearchStatusException.class, datafeedBuilder::build); - assertThat(actualException.getMessage(), equalTo("Cross-project search is not enabled for Datafeeds")); - assertThat(actualException.status(), equalTo(RestStatus.FORBIDDEN)); + // CPS mode is now allowed for datafeeds - should not throw + DatafeedConfig config = datafeedBuilder.build(); + assertThat(config.getIndicesOptions().resolveCrossProjectIndexExpression(), equalTo(true)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index aaf16def9b65a..3af33dd703af2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -1061,6 +1061,47 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) { return builder.build(); } + public void testCrossProjectIndicesOptionsAllowed() { + IndicesOptions cpsOptions = IndicesOptions.builder() + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df", "test-job").setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig config = builder.build(); // Should not throw + assertTrue(config.getIndicesOptions().resolveCrossProjectIndexExpression()); + } + + public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException { + // Create a datafeed with CPS enabled, using SearchRequest.DEFAULT_INDICES_OPTIONS as base + // to match what fromMap() will use as defaults for unparsed fields + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("test-df-cps", "test-job").setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig original = builder.build(); + assertTrue("Original config should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression()); + + // Serialize to XContent + BytesReference bytes = XContentHelper.toXContent(original, XContentType.JSON, false); + + // Parse back from XContent + DatafeedConfig parsed; + try (XContentParser xContentParser = parser(bytes)) { + parsed = DatafeedConfig.STRICT_PARSER.apply(xContentParser, null).build(); + } + + // Verify CPS flag survives round-trip (the main goal of this test) + assertTrue( + "CPS flag should survive XContent round-trip serialization", + parsed.getIndicesOptions().resolveCrossProjectIndexExpression() + ); + } + private XContentParser parser(String json) throws IOException { return JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry()), json); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java new file mode 100644 index 0000000000000..256ea81a99705 --- /dev/null +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java @@ -0,0 +1,297 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; +import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for Cross-Project Search (CPS) support in ML datafeeds. + * + * CPS allows datafeeds to search across multiple projects using the same index pattern + * syntax as Cross-Cluster Search (CCS). Since CPS is built on CCS infrastructure, + * datafeeds already handle CPS patterns correctly through existing remote index handling. + * + * Key validation: RemoteClusterAware.isRemoteIndexName() returns true for both CCS + * (cluster:index) and CPS (project:index) patterns, so existing remote index handling + * in DataExtractorFactory, DatafeedManager, and DatafeedNodeSelector automatically + * works for CPS. + */ +public class DatafeedCpsIT extends MlSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build(); + } + + @Override + protected Collection> getPlugins() { + return Stream.concat(super.getPlugins().stream(), Stream.of(CpsPlugin.class)).toList(); + } + + /** + * Test that a datafeed with CPS-enabled IndicesOptions can be created successfully. + * This validates that the CPS block has been removed from DatafeedConfig.Builder. + */ + public void testCrossProjectIndicesOptionsAllowedForDatafeed() throws Exception { + String jobId = "cps-test-job"; + String datafeedId = "cps-test-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS-enabled IndicesOptions + // This tests that the CPS block is removed - previously this would throw + // ElasticsearchStatusException("Cross-project search is not enabled for Datafeeds") + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully (no exception thrown) + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + + // Verify datafeed can be retrieved + GetDatafeedsAction.Response getResponse = client().execute( + GetDatafeedsAction.INSTANCE, + new GetDatafeedsAction.Request(datafeedId) + ).actionGet(); + DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); + assertThat(storedConfig.getId(), equalTo(datafeedId)); + assertThat(storedConfig.getIndices(), equalTo(List.of("logs-*"))); + } + + /** + * Test that the CPS flag survives cluster state persistence (GET after PUT). + * This validates that DatafeedConfig correctly serializes and deserializes + * the resolve_cross_project_index_expression field via XContent. + */ + public void testCpsFlagSurvivesClusterStatePersistence() throws Exception { + String jobId = "cps-persist-job"; + String datafeedId = "cps-persist-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS enabled + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices(List.of("logs-*")) + .setIndicesOptions(cpsOptions); + + DatafeedConfig originalConfig = datafeedBuilder.build(); + assertTrue("Original config should have CPS enabled", originalConfig.getIndicesOptions().resolveCrossProjectIndexExpression()); + + // Store the datafeed (goes through XContent serialization to cluster state) + PutDatafeedAction.Request request = new PutDatafeedAction.Request(originalConfig); + client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Retrieve the datafeed (comes back through XContent deserialization from cluster state) + GetDatafeedsAction.Response getResponse = client().execute( + GetDatafeedsAction.INSTANCE, + new GetDatafeedsAction.Request(datafeedId) + ).actionGet(); + DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); + + // Verify CPS flag survives the round-trip through cluster state + assertTrue( + "CPS flag should survive cluster state persistence", + storedConfig.getIndicesOptions().resolveCrossProjectIndexExpression() + ); + } + + /** + * Test that RemoteClusterAware.isRemoteIndexName() correctly identifies CPS qualified indices. + * This validates that the existing CCS infrastructure handles CPS patterns, which means + * existing remote index handling (rollup skip, HasPrivileges filter, index verification skip) + * will work correctly for CPS indices. + */ + public void testRemoteClusterAwareIdentifiesCpsPattern() { + // CCS remote pattern + assertThat(RemoteClusterAware.isRemoteIndexName("cluster1:logs-*"), is(true)); + + // CPS qualified pattern (same syntax as CCS) + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs-*"), is(true)); + + // Local/unqualified pattern + assertThat(RemoteClusterAware.isRemoteIndexName("logs-*"), is(false)); + + // Multiple colons - first colon separates remote/project from index + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs:v1-*"), is(true)); + + // Wildcard in project/cluster part + assertThat(RemoteClusterAware.isRemoteIndexName("prod-*:logs-*"), is(true)); + } + + /** + * Test that a datafeed with CPS qualified indices can be created. + * This validates that CPS qualified indices are handled like CCS remote indices. + */ + public void testDatafeedWithCpsQualifiedIndices() throws Exception { + String jobId = "cps-qualified-job"; + String datafeedId = "cps-qualified-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with CPS qualified indices (project:index pattern) + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices( + List.of("project1:logs-*", "project2:metrics-*") + ).setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully with CPS qualified indices + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + assertThat(response.getResponse().getIndices(), equalTo(List.of("project1:logs-*", "project2:metrics-*"))); + } + + /** + * Test that a datafeed with mixed local and CPS qualified indices can be created. + */ + public void testDatafeedWithMixedLocalAndCpsIndices() throws Exception { + String jobId = "mixed-indices-job"; + String datafeedId = "mixed-indices-datafeed"; + + // Create job first + Job.Builder job = createScheduledJob(jobId); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + + // Create datafeed with mixed local and CPS qualified indices + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId).setIndices( + List.of("local-logs-*", "project1:remote-logs-*") + ).setIndicesOptions(cpsOptions); + + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeedBuilder.build()); + PutDatafeedAction.Response response = client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + + // Verify datafeed was created successfully + assertThat(response.getResponse().getId(), equalTo(datafeedId)); + } + + /** + * Test CCS/CPS parity: both patterns should be identified as remote by RemoteClusterAware. + * This ensures that existing CCS handling in datafeeds automatically applies to CPS. + */ + public void testCcsAndCpsPatternParity() { + // Verify both CCS and CPS patterns are treated identically by RemoteClusterAware + String ccsPattern = "cluster1:logs-*"; + String cpsPattern = "project1:logs-*"; + + boolean ccsIsRemote = RemoteClusterAware.isRemoteIndexName(ccsPattern); + boolean cpsIsRemote = RemoteClusterAware.isRemoteIndexName(cpsPattern); + + assertThat("CCS pattern should be identified as remote", ccsIsRemote, is(true)); + assertThat("CPS pattern should be identified as remote", cpsIsRemote, is(true)); + assertThat("CCS and CPS patterns should be handled identically", ccsIsRemote, equalTo(cpsIsRemote)); + } + + /** + * Test that documents the security model for CPS indices in datafeeds. + * + * Security implications for CPS indices (project:index patterns): + * - Up-front privilege validation (HasPrivileges check) is skipped for CPS indices + * (same as CCS remote indices) - see DatafeedManager.putDatafeed() + * - This is by design: RemoteClusterLicenseChecker.isRemoteIndex() returns true for + * both CCS (cluster:index) and CPS (project:index) patterns + * - For CPS, privilege enforcement happens at search execution time via UIAM + * (Universal Identity and Access Management), not at datafeed creation time + * - This mirrors the existing CCS behavior where remote cluster privileges cannot + * be validated locally at datafeed creation time + * + * This test validates the filtering behavior that enables this security model. + */ + public void testCpsIndicesSkipUpfrontPrivilegeValidation() { + // Both CCS and CPS patterns are identified as "remote" by the license checker + // This causes them to be filtered out of the HasPrivileges check in DatafeedManager + + // CCS pattern - skipped from privilege check (can't validate remote cluster privileges) + assertThat(RemoteClusterAware.isRemoteIndexName("cluster1:logs-*"), is(true)); + + // CPS pattern - also skipped (privileges enforced at search time via UIAM) + assertThat(RemoteClusterAware.isRemoteIndexName("project1:logs-*"), is(true)); + + // Local pattern - NOT skipped (privileges validated at datafeed creation) + assertThat(RemoteClusterAware.isRemoteIndexName("logs-*"), is(false)); + + // Mixed indices: local indices get privilege checked, CPS indices rely on UIAM + List mixedIndices = List.of("local-logs-*", "project1:remote-logs-*"); + long localCount = mixedIndices.stream().filter(i -> RemoteClusterAware.isRemoteIndexName(i) == false).count(); + long remoteCount = mixedIndices.stream().filter(RemoteClusterAware::isRemoteIndexName).count(); + assertThat("Should have 1 local index for privilege validation", localCount, equalTo(1L)); + assertThat("Should have 1 CPS index skipped from privilege validation", remoteCount, equalTo(1L)); + } + + /** + * Helper method to create a scheduled job for testing. + */ + private static Job.Builder createScheduledJob(String jobId) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); + + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + /** + * Plugin to register the serverless.cross_project.enabled setting. + */ + public static class CpsPlugin extends Plugin implements ClusterPlugin { + public List> getSettings() { + return List.of(Setting.simpleString("serverless.cross_project.enabled", Setting.Property.NodeScope)); + } + } +} + diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c1c53033731db..5279bd593e7bc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1507,8 +1507,8 @@ public List getRestHandlers( restHandlers.add(new RestUpdateModelSnapshotAction()); restHandlers.add(new RestGetDatafeedsAction()); restHandlers.add(new RestGetDatafeedStatsAction()); - restHandlers.add(new RestPutDatafeedAction()); - restHandlers.add(new RestUpdateDatafeedAction()); + restHandlers.add(new RestPutDatafeedAction(settings)); + restHandlers.add(new RestUpdateDatafeedAction(settings)); restHandlers.add(new RestDeleteDatafeedAction()); restHandlers.add(new RestPreviewDatafeedAction()); restHandlers.add(new RestStartDatafeedAction()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 7321f991773e1..84560ea0a0284 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -106,6 +106,10 @@ public void putDatafeed( if (XPackSettings.SECURITY_ENABLED.get(settings)) { useSecondaryAuthIfAvailable(securityContext, () -> { // TODO: Remove this filter once https://github.com/elastic/elasticsearch/issues/67798 is fixed. + // Note: This filter excludes both CCS (cluster:index) and CPS (project:index) patterns from + // up-front privilege validation. For CCS, privileges cannot be checked locally. For CPS, + // privilege enforcement happens at search execution time via UIAM. This is by design - + // RemoteClusterLicenseChecker.isRemoteIndex() returns true for both patterns. final String[] indices = request.getDatafeed() .getIndices() .stream() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java index 360b14277a19a..30edcc592de91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -30,6 +32,12 @@ @ServerlessScope(Scope.PUBLIC) public class RestPutDatafeedAction extends BaseRestHandler { + private final CrossProjectModeDecider crossProjectModeDecider; + + public RestPutDatafeedAction(Settings settings) { + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); + } + @Override public List routes() { return List.of(new Route(PUT, BASE_PATH + "datafeeds/{" + ID + "}")); @@ -44,9 +52,10 @@ public String getName() { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); + boolean enableCps = crossProjectModeDecider.crossProjectEnabled(); PutDatafeedAction.Request putDatafeedRequest; try (XContentParser parser = restRequest.contentParser()) { - putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); + putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, enableCps, parser); } putDatafeedRequest.ackTimeout(getAckTimeout(restRequest)); putDatafeedRequest.masterNodeTimeout(getMasterNodeTimeout(restRequest)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java index 5335de78aa18a..040ad3f9d7813 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -30,6 +32,12 @@ @ServerlessScope(Scope.PUBLIC) public class RestUpdateDatafeedAction extends BaseRestHandler { + private final CrossProjectModeDecider crossProjectModeDecider; + + public RestUpdateDatafeedAction(Settings settings) { + this.crossProjectModeDecider = new CrossProjectModeDecider(settings); + } + @Override public List routes() { return List.of(new Route(POST, BASE_PATH + "datafeeds/{" + ID + "}/_update")); @@ -50,9 +58,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient || restRequest.hasParam("ignore_throttled")) { indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); } + boolean enableCps = crossProjectModeDecider.crossProjectEnabled(); UpdateDatafeedAction.Request updateDatafeedRequest; try (XContentParser parser = restRequest.contentParser()) { - updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); + updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, enableCps, parser); } updateDatafeedRequest.ackTimeout(getAckTimeout(restRequest)); updateDatafeedRequest.masterNodeTimeout(getMasterNodeTimeout(restRequest)); From 4865e9425a8b48d68c30c15a0cbbdb3473351bb2 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 18 Dec 2025 21:43:16 +0100 Subject: [PATCH 2/5] Update docs/changelog/139781.yaml --- docs/changelog/139781.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/139781.yaml diff --git a/docs/changelog/139781.yaml b/docs/changelog/139781.yaml new file mode 100644 index 0000000000000..f662f36967644 --- /dev/null +++ b/docs/changelog/139781.yaml @@ -0,0 +1,5 @@ +pr: 139781 +summary: Basic CPS Support for ML Datafeeds +area: Machine Learning +type: enhancement +issues: [] From 56de0541093d7d4f87618cb907496450f1e9ff25 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 18 Dec 2025 20:51:50 +0000 Subject: [PATCH 3/5] [CI] Auto commit changes from spotless --- .../xpack/core/ml/datafeed/DatafeedConfig.java | 2 -- .../xpack/ml/integration/DatafeedCpsIT.java | 13 ++++--------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 313dd97d80571..7ffc4c1e80d3b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -22,7 +21,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java index 256ea81a99705..3adec33252653 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedCpsIT.java @@ -85,10 +85,8 @@ public void testCrossProjectIndicesOptionsAllowedForDatafeed() throws Exception assertThat(response.getResponse().getId(), equalTo(datafeedId)); // Verify datafeed can be retrieved - GetDatafeedsAction.Response getResponse = client().execute( - GetDatafeedsAction.INSTANCE, - new GetDatafeedsAction.Request(datafeedId) - ).actionGet(); + GetDatafeedsAction.Response getResponse = client().execute(GetDatafeedsAction.INSTANCE, new GetDatafeedsAction.Request(datafeedId)) + .actionGet(); DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); assertThat(storedConfig.getId(), equalTo(datafeedId)); assertThat(storedConfig.getIndices(), equalTo(List.of("logs-*"))); @@ -123,10 +121,8 @@ public void testCpsFlagSurvivesClusterStatePersistence() throws Exception { client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); // Retrieve the datafeed (comes back through XContent deserialization from cluster state) - GetDatafeedsAction.Response getResponse = client().execute( - GetDatafeedsAction.INSTANCE, - new GetDatafeedsAction.Request(datafeedId) - ).actionGet(); + GetDatafeedsAction.Response getResponse = client().execute(GetDatafeedsAction.INSTANCE, new GetDatafeedsAction.Request(datafeedId)) + .actionGet(); DatafeedConfig storedConfig = getResponse.getResponse().results().get(0); // Verify CPS flag survives the round-trip through cluster state @@ -294,4 +290,3 @@ public List> getSettings() { } } } - From ca90f6c58e84fd3970efde7b8d373ca46d9cf76a Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 18 Dec 2025 21:57:30 +0100 Subject: [PATCH 4/5] [ML] Refactor Datafeed Actions to Utilize Centralized CPS Handling This commit refactors the `PutDatafeedAction` and `UpdateDatafeedAction` classes to leverage a new method in `DatafeedConfig` for managing Cross-Project Search (CPS) settings. Key changes include: - Replaced inline CPS handling logic with a call to `DatafeedConfig.ensureCrossProjectSearchEnabled`, simplifying the code and improving maintainability. - Updated the `DatafeedUpdate` parser to correctly handle the `resolve_cross_project_index_expression` field during deserialization. - Added tests to ensure that CPS options are correctly applied and serialized. These changes enhance the clarity and efficiency of CPS management within datafeed actions, supporting better integration of cross-project capabilities in machine learning workflows. --- .../core/ml/action/PutDatafeedAction.java | 14 ++------- .../core/ml/action/UpdateDatafeedAction.java | 15 +++------ .../core/ml/datafeed/DatafeedConfig.java | 17 ++++++++++ .../core/ml/datafeed/DatafeedUpdate.java | 19 +++++++++--- .../core/ml/datafeed/DatafeedUpdateTests.java | 31 +++++++++++++++++++ 5 files changed, 69 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java index 43224038f9789..e4ff9f2f9c358 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java @@ -46,17 +46,9 @@ public static Request parseRequest( datafeed.setIndicesOptions(indicesOptions); } // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options - // regardless of whether it came from request params or body - if (enableCrossProjectSearch && datafeed.getIndicesOptions() != null) { - IndicesOptions currentOptions = datafeed.getIndicesOptions(); - if (currentOptions.resolveCrossProjectIndexExpression() == false) { - datafeed.setIndicesOptions( - IndicesOptions.builder(currentOptions) - .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) - .build() - ); - } - } + datafeed.setIndicesOptions( + DatafeedConfig.ensureCrossProjectSearchEnabled(datafeed.getIndicesOptions(), enableCrossProjectSearch) + ); datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 694a557cbd111..ad0c23f911a0a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import java.io.IOException; @@ -46,17 +47,9 @@ public static Request parseRequest( update.setIndicesOptions(indicesOptions); } // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options - // regardless of whether it came from request params or body - if (enableCrossProjectSearch && update.getIndicesOptions() != null) { - IndicesOptions currentOptions = update.getIndicesOptions(); - if (currentOptions.resolveCrossProjectIndexExpression() == false) { - update.setIndicesOptions( - IndicesOptions.builder(currentOptions) - .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) - .build() - ); - } - } + update.setIndicesOptions( + DatafeedConfig.ensureCrossProjectSearchEnabled(update.getIndicesOptions(), enableCrossProjectSearch) + ); update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 313dd97d80571..bac5012f8869e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -222,6 +222,23 @@ private static ObjectParser createParser(boolean ignoreUnknownFie return parser; } + /** + * Ensures the cross-project search flag is enabled on the given IndicesOptions if not already set. + * Returns the options unchanged if they are null or already have CPS enabled. + * + * @param options the IndicesOptions to potentially update + * @param enableCrossProjectSearch whether CPS should be enabled + * @return the updated IndicesOptions with CPS enabled, or the original options if no change is needed + */ + public static IndicesOptions ensureCrossProjectSearchEnabled(IndicesOptions options, boolean enableCrossProjectSearch) { + if (enableCrossProjectSearch && options != null && options.resolveCrossProjectIndexExpression() == false) { + return IndicesOptions.builder(options) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + } + return options; + } + private final String id; private final String jobId; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index d3e5961ae1afb..977625cc3b6bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -87,11 +87,17 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { DatafeedConfig.DELAYED_DATA_CHECK_CONFIG ); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); - PARSER.declareObject( - Builder::setIndicesOptions, - (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), - DatafeedConfig.INDICES_OPTIONS - ); + PARSER.declareObject(Builder::setIndicesOptions, (p, c) -> { + Map map = p.map(); + IndicesOptions baseOptions = IndicesOptions.fromMap(map, SearchRequest.DEFAULT_INDICES_OPTIONS); + Object crossProjectValue = map.get("resolve_cross_project_index_expression"); + if (crossProjectValue instanceof Boolean && (Boolean) crossProjectValue) { + return IndicesOptions.builder(baseOptions) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + } + return baseOptions; + }, DatafeedConfig.INDICES_OPTIONS); PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); } @@ -248,6 +254,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (indicesOptions != null) { builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); indicesOptions.toXContent(builder, params); + if (indicesOptions.resolveCrossProjectIndexExpression()) { + builder.field("resolve_cross_project_index_expression", true); + } builder.endObject(); } addOptionalField(builder, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD, runtimeMappings); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index 3b1c4e0248e2c..59098f51fcd55 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -330,6 +330,37 @@ public void testApply_givenIndicesOptions() { assertThat(updatedDatafeed.getIndicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN)); } + public void testCrossProjectIndicesOptionsAllowed() { + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedUpdate update = new DatafeedUpdate.Builder("test-df").setIndicesOptions(cpsOptions).build(); + + assertThat("Update should have CPS enabled", update.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true)); + } + + public void testCrossProjectIndicesOptionsXContentRoundTrip() throws IOException { + IndicesOptions cpsOptions = IndicesOptions.builder(SearchRequest.DEFAULT_INDICES_OPTIONS) + .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) + .build(); + + DatafeedUpdate original = new DatafeedUpdate.Builder("test-df-cps").setIndicesOptions(cpsOptions).build(); + + assertThat("Original should have CPS enabled", original.getIndicesOptions().resolveCrossProjectIndexExpression(), is(true)); + + // Serialize to XContent and parse back + try (XContentParser parser = createParser(XContentType.JSON.xContent(), org.elasticsearch.common.Strings.toString(original))) { + DatafeedUpdate parsed = DatafeedUpdate.PARSER.apply(parser, null).build(); + + assertThat( + "CPS flag should survive XContent round-trip", + parsed.getIndicesOptions().resolveCrossProjectIndexExpression(), + is(true) + ); + } + } + public void testApply_GivenRandomUpdates_AssertImmutability() { for (int i = 0; i < 100; ++i) { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId()); From 9cb8f90c2c3ce294be675e367b8e3e6ba3f3309a Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 18 Dec 2025 21:06:38 +0000 Subject: [PATCH 5/5] [CI] Auto commit changes from spotless --- .../xpack/core/ml/action/UpdateDatafeedAction.java | 4 +--- .../elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index ad0c23f911a0a..17ccf5fd91400 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -47,9 +47,7 @@ public static Request parseRequest( update.setIndicesOptions(indicesOptions); } // If CPS is enabled cluster-wide, ensure the CPS flag is set on indices_options - update.setIndicesOptions( - DatafeedConfig.ensureCrossProjectSearchEnabled(update.getIndicesOptions(), enableCrossProjectSearch) - ); + update.setIndicesOptions(DatafeedConfig.ensureCrossProjectSearchEnabled(update.getIndicesOptions(), enableCrossProjectSearch)); update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index c964ea902a01f..388402dae8e21 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -230,9 +230,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie */ public static IndicesOptions ensureCrossProjectSearchEnabled(IndicesOptions options, boolean enableCrossProjectSearch) { if (enableCrossProjectSearch && options != null && options.resolveCrossProjectIndexExpression() == false) { - return IndicesOptions.builder(options) - .crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)) - .build(); + return IndicesOptions.builder(options).crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)).build(); } return options; }