Skip to content

Commit

Permalink
refactor(misc): testngJava fix, systemrestli client, cache key fix, e… (
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Oct 1, 2023
1 parent 4d9a7ce commit b61c38a
Show file tree
Hide file tree
Showing 38 changed files with 404 additions and 183 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ subprojects {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1

if (project.configurations.getByName("testImplementation").getDependencies()
.any{ it.getName() == "testng" }) {
.any{ it.getName().contains("testng") }) {
useTestNG()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static List<ReindexConfig> getAllReindexConfigs(List<ElasticSearchIndexed
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
if (reindexConfigs.isEmpty()) {
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
reindexConfigs.addAll(elasticSearchIndexed.getReindexConfigs());
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs());
}
_reindexConfigs = new ArrayList<>(reindexConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand Down Expand Up @@ -35,4 +36,7 @@ public class UpgradeCliApplicationTestConfiguration {

@MockBean
ConfigEntityRegistry configEntityRegistry;

@MockBean
public EntityIndexBuilders entityIndexBuilders;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.ebean.PagedList;
import io.ebean.Transaction;

import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Timestamp;
Expand Down Expand Up @@ -103,6 +104,9 @@ Integer countAspect(
@Nonnull
PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args);

@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);

int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Timer;
import com.linkedin.data.template.GetMode;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import javax.persistence.EntityNotFoundException;

import io.ebean.Transaction;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.*;
Expand Down Expand Up @@ -144,11 +146,11 @@ public class EntityServiceImpl implements EntityService {
private final Map<String, Set<String>> _entityToValidAspects;
private RetentionService _retentionService;
private final Boolean _alwaysEmitChangeLog;
@Getter
private final UpdateIndicesService _updateIndicesService;
private final PreProcessHooks _preProcessHooks;
protected static final int MAX_KEYS_PER_QUERY = 500;


private final Integer ebeanMaxTransactionRetry;

public EntityServiceImpl(
Expand Down Expand Up @@ -180,6 +182,11 @@ public EntityServiceImpl(
ebeanMaxTransactionRetry = retry != null ? retry : DEFAULT_MAX_TRANSACTION_RETRY;
}

@Override
public void setSystemEntityClient(SystemEntityClient systemEntityClient) {
this._updateIndicesService.setSystemEntityClient(systemEntityClient);
}

/**
* Retrieves the latest aspects corresponding to a batch of {@link Urn}s based on a provided
* set of aspect names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -445,6 +446,12 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
return null;
}

@Nonnull
@Override
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
// Not implemented
return null;
}

@Override
@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -433,6 +434,18 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
.findPagedList();
}

@Override
@Nonnull
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.like(EbeanAspectV2.URN_COLUMN, "urn:li:" + entityName + ":%");
return exp.query().findStream().map(EbeanAspectV2::toEntityAspect);
}

@Override
@Nonnull
public Iterable<String> listAllUrns(int start, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void removeEdgesFromNode(
public void configure() {
log.info("Setting up elastic graph index");
try {
for (ReindexConfig config : getReindexConfigs()) {
for (ReindexConfig config : buildReindexConfigs()) {
_indexBuilder.buildIndex(config);
}
} catch (IOException e) {
Expand All @@ -327,7 +327,7 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() throws IOException {
public List<ReindexConfig> buildReindexConfigs() throws IOException {
return List.of(_indexBuilder.buildReindexState(_indexConvention.getIndexName(INDEX_NAME),
GraphRelationshipMappingsBuilder.getMappings(), Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return indexBuilders.getReindexConfigs();
public List<ReindexConfig> buildReindexConfigs() {
return indexBuilders.buildReindexConfigs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,7 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
// no need to reindex and only new mappings or dynamic settings

// Just update the additional mappings
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
}
applyMappings(indexState, true);

if (indexState.requiresApplySettings()) {
UpdateSettingsRequest request = new UpdateSettingsRequest(indexState.name());
Expand All @@ -234,6 +229,26 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
}
}

/**
* Apply mappings changes if reindex is not required
* @param indexState the state of the current and target index settings/mappings
* @param suppressError during reindex logic this is not an error, for structured properties it is an error
* @throws IOException communication issues with ES
*/
public void applyMappings(ReindexConfig indexState, boolean suppressError) throws IOException {
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
} else {
if (!suppressError) {
log.error("Attempted to apply invalid mappings. Current: {} Target: {}", indexState.currentMappings(),
indexState.targetMappings());
}
}
}

public String reindexInPlaceAsync(String indexAlias, @Nullable QueryBuilder filterQuery, BatchWriteOperationsOptions options, ReindexConfig config)
throws Exception {
GetAliasesResponse aliasesResponse = _searchClient.indices().getAlias(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.RequiredArgsConstructor;
Expand All @@ -14,32 +16,37 @@
@RequiredArgsConstructor
@Slf4j
public class EntityIndexBuilders implements ElasticSearchIndexed {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

@Override
public void reindexAll() {
for (ReindexConfig config : getReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return entityRegistry.getEntitySpecs().values().stream().flatMap(entitySpec -> {
try {
return new EntityIndexBuilder(indexBuilder, entitySpec, settingsBuilder, indexConvention.getIndexName(entitySpec))
.getReindexConfigs().stream();
} catch (IOException e) {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}

@Override
public void reindexAll() {
for (ReindexConfig config : buildReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> buildReindexConfigs() {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream().map(entitySpec -> {
try {
Map<String, Object> mappings = MappingsBuilder.getMappings(entitySpec);
return indexBuilder.buildReindexState(indexConvention.getIndexName(entitySpec), mappings, settings);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
).collect(Collectors.toList());
}
}
).collect(Collectors.toList());
}
}
Loading

0 comments on commit b61c38a

Please sign in to comment.