Skip to content

Commit

Permalink
Making changes to BulkRequest to enable component template substituti…
Browse files Browse the repository at this point in the history
…ons in the simulate ingest API (#112957) (#113067)
  • Loading branch information
masseyke committed Sep 19, 2024
1 parent 8a5d68e commit e26fe9f
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_AGGREGATE_EXEC_TRACKS_INTERMEDIATE_ATTRS = def(8_738_00_0);
public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0);
public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0);
public static final TransportVersion SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS = def(8_743_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,6 +39,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -475,4 +477,27 @@ public Set<String> getIndices() {
public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}

/*
* Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have
* substitutions in the event of a simulated request.
*/
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() throws IOException {
return Map.of();
}

/*
* This copies this bulk request, but without all of its inner requests or the set of indices found in those requests
*/
public BulkRequest shallowClone() {
BulkRequest bulkRequest = new BulkRequest(globalIndex);
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
bulkRequest.timeout(timeout());
bulkRequest.pipeline(pipeline());
bulkRequest.routing(routing());
bulkRequest.requireAlias(requireAlias());
bulkRequest.requireDataStream(requireDataStream());
return bulkRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ BulkRequest getBulkRequest() {
if (itemResponses.isEmpty()) {
return bulkRequest;
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());
BulkRequest modifiedBulkRequest = bulkRequest.shallowClone();

int slot = 0;
List<DocWriteRequest<?>> requests = bulkRequest.requests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* This extends BulkRequest with support for providing substitute pipeline definitions. In a user request, the pipeline substitutions
* will look something like this:
* This extends BulkRequest with support for providing substitute pipeline definitions and component template definitions. In a user
* request, the substitutions will look something like this:
*
* "pipeline_substitutions": {
* "my-pipeline-1": {
Expand All @@ -45,6 +50,29 @@
* }
* ]
* }
* },
* "component_template_substitutions": {
* "my-template-1": {
* "template": {
* "settings": {
* "number_of_shards": 1
* },
* "mappings": {
* "_source": {
* "enabled": false
* },
* "properties": {
* "host_name": {
* "type": "keyword"
* },
* "created_at": {
* "type": "date",
* "format": "EEE MMM dd HH:mm:ss Z yyyy"
* }
* }
* }
* }
* }
* }
*
* The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys
Expand All @@ -53,27 +81,42 @@
*/
public class SimulateBulkRequest extends BulkRequest {
private final Map<String, Map<String, Object>> pipelineSubstitutions;
private final Map<String, Map<String, Object>> componentTemplateSubstitutions;

/**
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
* the same pipelineId. The key of the map is the pipelineId, and the value the pipeline definition as
* parsed by XContentHelper.convertToMap().
* @param componentTemplateSubstitutions The component template definitions that are to be used in place of any pre-existing
* component template definitions with the same name.
*/
public SimulateBulkRequest(@Nullable Map<String, Map<String, Object>> pipelineSubstitutions) {
public SimulateBulkRequest(
@Nullable Map<String, Map<String, Object>> pipelineSubstitutions,
@Nullable Map<String, Map<String, Object>> componentTemplateSubstitutions
) {
super();
this.pipelineSubstitutions = pipelineSubstitutions;
this.componentTemplateSubstitutions = componentTemplateSubstitutions;
}

@SuppressWarnings("unchecked")
public SimulateBulkRequest(StreamInput in) throws IOException {
super(in);
this.pipelineSubstitutions = (Map<String, Map<String, Object>>) in.readGenericValue();
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) {
this.componentTemplateSubstitutions = (Map<String, Map<String, Object>>) in.readGenericValue();
} else {
componentTemplateSubstitutions = Map.of();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeGenericValue(pipelineSubstitutions);
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) {
out.writeGenericValue(componentTemplateSubstitutions);
}
}

public Map<String, Map<String, Object>> getPipelineSubstitutions() {
Expand All @@ -84,4 +127,37 @@ public Map<String, Map<String, Object>> getPipelineSubstitutions() {
public boolean isSimulated() {
return true;
}

@Override
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() throws IOException {
if (componentTemplateSubstitutions == null) {
return Map.of();
}
Map<String, ComponentTemplate> result = new HashMap<>(componentTemplateSubstitutions.size());
for (Map.Entry<String, Map<String, Object>> rawEntry : componentTemplateSubstitutions.entrySet()) {
result.put(rawEntry.getKey(), convertRawTemplateToComponentTemplate(rawEntry.getValue()));
}
return result;
}

private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) throws IOException {
ComponentTemplate componentTemplate;
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) {
componentTemplate = ComponentTemplate.parse(parser);
}
return componentTemplate;
}

@Override
public BulkRequest shallowClone() {
BulkRequest bulkRequest = new SimulateBulkRequest(pipelineSubstitutions, componentTemplateSubstitutions);
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
bulkRequest.timeout(timeout());
bulkRequest.pipeline(pipeline());
bulkRequest.routing(routing());
bulkRequest.requireAlias(requireAlias());
bulkRequest.requireDataStream(requireDataStream());
return bulkRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
SimulateBulkRequest bulkRequest = new SimulateBulkRequest(
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions")
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions"),
(Map<String, Map<String, Object>>) sourceMap.remove("component_template_substitutions")
);
BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap);
bulkRequest.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,24 @@ public void testUnsupportedAction() {
allOf(containsString("Malformed action/metadata line [1]"), containsString("found [get"))
);
}

public void testShallowClone() {
BulkRequest bulkRequest = new BulkRequest(randomBoolean() ? null : randomAlphaOfLength(10));
bulkRequest.setRefreshPolicy(randomFrom(RefreshPolicy.values()));
bulkRequest.waitForActiveShards(randomIntBetween(1, 10));
bulkRequest.timeout(randomTimeValue());
bulkRequest.pipeline(randomBoolean() ? null : randomAlphaOfLength(10));
bulkRequest.routing(randomBoolean() ? null : randomAlphaOfLength(10));
bulkRequest.requireAlias(randomBoolean());
bulkRequest.requireDataStream(randomBoolean());
BulkRequest shallowCopy = bulkRequest.shallowClone();
assertThat(shallowCopy.requests, equalTo(List.of()));
assertThat(shallowCopy.getRefreshPolicy(), equalTo(bulkRequest.getRefreshPolicy()));
assertThat(shallowCopy.waitForActiveShards(), equalTo(bulkRequest.waitForActiveShards()));
assertThat(shallowCopy.timeout(), equalTo(bulkRequest.timeout()));
assertThat(shallowCopy.pipeline(), equalTo(bulkRequest.pipeline()));
assertThat(shallowCopy.routing(), equalTo(bulkRequest.routing()));
assertThat(shallowCopy.requireAlias(), equalTo(bulkRequest.requireAlias()));
assertThat(shallowCopy.requireDataStream(), equalTo(bulkRequest.requireDataStream()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,36 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class SimulateBulkRequestTests extends ESTestCase {

public void testSerialization() throws Exception {
testSerialization(getTestPipelineSubstitutions());
testSerialization(null);
testSerialization(Map.of());
testSerialization(getTestPipelineSubstitutions(), getTestTemplateSubstitutions());
testSerialization(getTestPipelineSubstitutions(), null);
testSerialization(null, getTestTemplateSubstitutions());
testSerialization(null, null);
testSerialization(Map.of(), Map.of());
}

private void testSerialization(Map<String, Map<String, Object>> pipelineSubstitutions) throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
private void testSerialization(
Map<String, Map<String, Object>> pipelineSubstitutions,
Map<String, Map<String, Object>> templateSubstitutions
) throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, templateSubstitutions);
/*
* Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a
* Writable
Expand All @@ -35,6 +47,94 @@ private void testSerialization(Map<String, Map<String, Object>> pipelineSubstitu
assertThat(copy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions()));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void testGetComponentTemplateSubstitutions() throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
assertThat(simulateBulkRequest.getComponentTemplateSubstitutions(), equalTo(Map.of()));
String substituteComponentTemplatesString = """
{
"mappings_template": {
"template": {
"mappings": {
"dynamic": "true",
"properties": {
"foo": {
"type": "keyword"
}
}
}
}
},
"settings_template": {
"template": {
"settings": {
"index": {
"default_pipeline": "bar-pipeline"
}
}
}
}
}
""";

Map tempMap = XContentHelper.convertToMap(
new BytesArray(substituteComponentTemplatesString.getBytes(StandardCharsets.UTF_8)),
randomBoolean(),
XContentType.JSON
).v2();
Map<String, Map<String, Object>> substituteComponentTemplates = (Map<String, Map<String, Object>>) tempMap;
simulateBulkRequest = new SimulateBulkRequest(Map.of(), substituteComponentTemplates);
Map<String, ComponentTemplate> componentTemplateSubstitutions = simulateBulkRequest.getComponentTemplateSubstitutions();
assertThat(componentTemplateSubstitutions.size(), equalTo(2));
assertThat(
XContentHelper.convertToMap(
XContentHelper.toXContent(
componentTemplateSubstitutions.get("mappings_template").template(),
XContentType.JSON,
randomBoolean()
),
randomBoolean(),
XContentType.JSON
).v2(),
equalTo(substituteComponentTemplates.get("mappings_template").get("template"))
);
assertNull(componentTemplateSubstitutions.get("mappings_template").template().settings());
assertNull(componentTemplateSubstitutions.get("settings_template").template().mappings());
assertThat(componentTemplateSubstitutions.get("settings_template").template().settings().size(), equalTo(1));
assertThat(
componentTemplateSubstitutions.get("settings_template").template().settings().get("index.default_pipeline"),
equalTo("bar-pipeline")
);
}

public void testShallowClone() throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(getTestPipelineSubstitutions(), getTestTemplateSubstitutions());
simulateBulkRequest.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()));
simulateBulkRequest.waitForActiveShards(randomIntBetween(1, 10));
simulateBulkRequest.timeout(randomTimeValue());
simulateBulkRequest.pipeline(randomBoolean() ? null : randomAlphaOfLength(10));
simulateBulkRequest.routing(randomBoolean() ? null : randomAlphaOfLength(10));
simulateBulkRequest.requireAlias(randomBoolean());
simulateBulkRequest.requireDataStream(randomBoolean());
BulkRequest shallowCopy = simulateBulkRequest.shallowClone();
assertThat(shallowCopy, instanceOf(SimulateBulkRequest.class));
SimulateBulkRequest simulateBulkRequestCopy = (SimulateBulkRequest) shallowCopy;
assertThat(simulateBulkRequestCopy.requests, equalTo(List.of()));
assertThat(
simulateBulkRequestCopy.getComponentTemplateSubstitutions(),
equalTo(simulateBulkRequest.getComponentTemplateSubstitutions())
);
assertThat(simulateBulkRequestCopy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions()));
assertThat(simulateBulkRequestCopy.getRefreshPolicy(), equalTo(simulateBulkRequest.getRefreshPolicy()));
assertThat(simulateBulkRequestCopy.waitForActiveShards(), equalTo(simulateBulkRequest.waitForActiveShards()));
assertThat(simulateBulkRequestCopy.timeout(), equalTo(simulateBulkRequest.timeout()));
assertThat(shallowCopy.pipeline(), equalTo(simulateBulkRequest.pipeline()));
assertThat(shallowCopy.routing(), equalTo(simulateBulkRequest.routing()));
assertThat(shallowCopy.requireAlias(), equalTo(simulateBulkRequest.requireAlias()));
assertThat(shallowCopy.requireDataStream(), equalTo(simulateBulkRequest.requireDataStream()));

}

private static Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
return Map.of(
"pipeline1",
Expand All @@ -43,4 +143,16 @@ private static Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
Map.of("processors", List.of(Map.of("processor3", Map.of())))
);
}

private static Map<String, Map<String, Object>> getTestTemplateSubstitutions() {
return Map.of(
"template1",
Map.of(
"template",
Map.of("mappings", Map.of("_source", Map.of("enabled", false), "properties", Map.of()), "settings", Map.of())
),
"template2",
Map.of("template", Map.of("mappings", Map.of(), "settings", Map.of()))
);
}
}
Loading

0 comments on commit e26fe9f

Please sign in to comment.