Skip to content

Commit

Permalink
Fix the problem that Lucene lock is held by VM (#6570)
Browse files Browse the repository at this point in the history
#### What type of PR is this?

/kind bug
/area core
/milestone 2.20.x

#### What this PR does / why we need it:

This PR add keyword synchronized for methods `addOrUpdateDocuments`, `deleteDocuments` and `deleteAll` to ensure the write lock of Lucene is obtained only by one IndexWriter at the same time.

#### Which issue(s) this PR fixes:

Fixes #6569 

#### Does this PR introduce a user-facing change?

```release-note
修复重启后无法搜索部分文档的问题
```
  • Loading branch information
JohnNiang committed Sep 3, 2024
1 parent 7281a48 commit 19de4db
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class LuceneSearchEngine implements SearchEngine, InitializingBean, Dispo

private Directory directory;

public LuceneSearchEngine(Path indexRootDir) throws IOException {
public LuceneSearchEngine(Path indexRootDir) {
this.indexRootDir = indexRootDir;
}

Expand All @@ -106,12 +106,14 @@ public void addOrUpdate(Iterable<HaloDocument> haloDocs) {

var writerConfig = new IndexWriterConfig(this.analyzer)
.setOpenMode(CREATE_OR_APPEND);
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.updateDocuments(deleteQuery, docs);
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
synchronized (this) {
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.updateDocuments(deleteQuery, docs);
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
}
}
}

Expand All @@ -122,25 +124,29 @@ public void deleteDocument(Iterable<String> haloDocIds) {
var deleteQuery = new TermInSetQuery("id", terms);
var writerConfig = new IndexWriterConfig(this.analyzer)
.setOpenMode(CREATE_OR_APPEND);
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.deleteDocuments(deleteQuery);
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
synchronized (this) {
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.deleteDocuments(deleteQuery);
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
}
}
}

@Override
public void deleteAll() {
var writerConfig = new IndexWriterConfig(this.analyzer)
.setOpenMode(CREATE_OR_APPEND);
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.deleteAll();
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
synchronized (this) {
try (var indexWriter = new IndexWriter(this.directory, writerConfig)) {
indexWriter.deleteAll();
} catch (IOException e) {
throw Exceptions.propagate(e);
} finally {
this.refreshSearcherManager();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.AlreadyClosedException;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -76,6 +83,35 @@ void shouldDeleteAll() throws IOException {
assertEquals(0, reader.getDocCount("id"));
}

@Test
void shouldAddOrUpdateDocumentConcurrently()
throws ExecutionException, InterruptedException, TimeoutException {
runConcurrently(() -> {
var haloDoc = createFakeHaloDoc();
searchEngine.addOrUpdate(List.of(haloDoc));
});
}

@Test
void shouldDeleteDocumentConcurrently()
throws ExecutionException, InterruptedException, TimeoutException {
runConcurrently(() -> {
var haloDoc = createFakeHaloDoc();
searchEngine.addOrUpdate(List.of(haloDoc));
searchEngine.deleteDocument(List.of(haloDoc.getId()));
});
}

@Test
void shouldDeleteAllConcurrently()
throws ExecutionException, InterruptedException, TimeoutException {
runConcurrently(() -> {
var haloDoc = createFakeHaloDoc();
searchEngine.addOrUpdate(List.of(haloDoc));
searchEngine.deleteAll();
});
}

@Test
void shouldDestroy() throws Exception {
var directory = this.searchEngine.getDirectory();
Expand Down Expand Up @@ -118,6 +154,17 @@ void shouldSearch() {
assertEquals("<fake-tag>fake</fake-tag>-content", gotHaloDoc.getContent());
}

void runConcurrently(Runnable runnable)
throws ExecutionException, InterruptedException, TimeoutException {
var executorService = Executors.newFixedThreadPool(10);
var futures = IntStream.of(0, 10)
.mapToObj(i -> CompletableFuture.runAsync(runnable, executorService))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
executorService.shutdownNow();
assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
}

HaloDocument createFakeHaloDoc() {
var haloDoc = new HaloDocument();
haloDoc.setId("fake-id");
Expand Down

0 comments on commit 19de4db

Please sign in to comment.