Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(misc): testngJava fix, systemrestli client, cache key fix, e… #8926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ subprojects {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1

if (project.configurations.getByName("testImplementation").getDependencies()
.any{ it.getName() == "testng" }) {
.any{ it.getName().contains("testng") }) {
useTestNG()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static List<ReindexConfig> getAllReindexConfigs(List<ElasticSearchIndexed
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
if (reindexConfigs.isEmpty()) {
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
reindexConfigs.addAll(elasticSearchIndexed.getReindexConfigs());
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs());
}
_reindexConfigs = new ArrayList<>(reindexConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand Down Expand Up @@ -35,4 +36,7 @@ public class UpgradeCliApplicationTestConfiguration {

@MockBean
ConfigEntityRegistry configEntityRegistry;

@MockBean
public EntityIndexBuilders entityIndexBuilders;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.ebean.PagedList;
import io.ebean.Transaction;

import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Timestamp;
Expand Down Expand Up @@ -103,6 +104,9 @@ Integer countAspect(
@Nonnull
PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args);

@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);

int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Timer;
import com.linkedin.data.template.GetMode;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import javax.persistence.EntityNotFoundException;

import io.ebean.Transaction;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.*;
Expand Down Expand Up @@ -144,11 +146,11 @@ public class EntityServiceImpl implements EntityService {
private final Map<String, Set<String>> _entityToValidAspects;
private RetentionService _retentionService;
private final Boolean _alwaysEmitChangeLog;
@Getter
private final UpdateIndicesService _updateIndicesService;
private final PreProcessHooks _preProcessHooks;
protected static final int MAX_KEYS_PER_QUERY = 500;


private final Integer ebeanMaxTransactionRetry;

public EntityServiceImpl(
Expand Down Expand Up @@ -180,6 +182,11 @@ public EntityServiceImpl(
ebeanMaxTransactionRetry = retry != null ? retry : DEFAULT_MAX_TRANSACTION_RETRY;
}

@Override
public void setSystemEntityClient(SystemEntityClient systemEntityClient) {
this._updateIndicesService.setSystemEntityClient(systemEntityClient);
}

/**
* Retrieves the latest aspects corresponding to a batch of {@link Urn}s based on a provided
* set of aspect names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -445,6 +446,12 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
return null;
}

@Nonnull
@Override
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
// Not implemented
return null;
}

@Override
@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -433,6 +434,18 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
.findPagedList();
}

@Override
@Nonnull
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.like(EbeanAspectV2.URN_COLUMN, "urn:li:" + entityName + ":%");
return exp.query().findStream().map(EbeanAspectV2::toEntityAspect);
}

@Override
@Nonnull
public Iterable<String> listAllUrns(int start, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void removeEdgesFromNode(
public void configure() {
log.info("Setting up elastic graph index");
try {
for (ReindexConfig config : getReindexConfigs()) {
for (ReindexConfig config : buildReindexConfigs()) {
_indexBuilder.buildIndex(config);
}
} catch (IOException e) {
Expand All @@ -327,7 +327,7 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() throws IOException {
public List<ReindexConfig> buildReindexConfigs() throws IOException {
return List.of(_indexBuilder.buildReindexState(_indexConvention.getIndexName(INDEX_NAME),
GraphRelationshipMappingsBuilder.getMappings(), Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return indexBuilders.getReindexConfigs();
public List<ReindexConfig> buildReindexConfigs() {
return indexBuilders.buildReindexConfigs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,7 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
// no need to reindex and only new mappings or dynamic settings

// Just update the additional mappings
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
}
applyMappings(indexState, true);

if (indexState.requiresApplySettings()) {
UpdateSettingsRequest request = new UpdateSettingsRequest(indexState.name());
Expand All @@ -234,6 +229,26 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
}
}

/**
* Apply mappings changes if reindex is not required
* @param indexState the state of the current and target index settings/mappings
* @param suppressError during reindex logic this is not an error, for structured properties it is an error
* @throws IOException communication issues with ES
*/
public void applyMappings(ReindexConfig indexState, boolean suppressError) throws IOException {
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
} else {
if (!suppressError) {
log.error("Attempted to apply invalid mappings. Current: {} Target: {}", indexState.currentMappings(),
indexState.targetMappings());
}
}
}

public String reindexInPlaceAsync(String indexAlias, @Nullable QueryBuilder filterQuery, BatchWriteOperationsOptions options, ReindexConfig config)
throws Exception {
GetAliasesResponse aliasesResponse = _searchClient.indices().getAlias(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.RequiredArgsConstructor;
Expand All @@ -14,32 +16,37 @@
@RequiredArgsConstructor
@Slf4j
public class EntityIndexBuilders implements ElasticSearchIndexed {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

@Override
public void reindexAll() {
for (ReindexConfig config : getReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return entityRegistry.getEntitySpecs().values().stream().flatMap(entitySpec -> {
try {
return new EntityIndexBuilder(indexBuilder, entitySpec, settingsBuilder, indexConvention.getIndexName(entitySpec))
.getReindexConfigs().stream();
} catch (IOException e) {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}

@Override
public void reindexAll() {
for (ReindexConfig config : buildReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> buildReindexConfigs() {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream().map(entitySpec -> {
try {
Map<String, Object> mappings = MappingsBuilder.getMappings(entitySpec);
return indexBuilder.buildReindexState(indexConvention.getIndexName(entitySpec), mappings, settings);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
).collect(Collectors.toList());
}
}
).collect(Collectors.toList());
}
}
Loading
Loading