> failedTupleLists;
- try {
- failedTupleLists = waitAck.getAllPresent(failedIds);
- if (!failedTupleLists.isEmpty()) {
- waitAck.invalidateAll(failedTupleLists.keySet());
- }
- } finally {
- waitAckLock.unlock();
- }
-
- for (var id : failedIds) {
- var failedTuples = failedTupleLists.get(id);
- if (failedTuples != null) {
- LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id);
- for (Tuple x : failedTuples) {
- // fail it
- eventCounter.scope("failed").incrBy(1);
- _collector.fail(x);
- }
- } else {
- LOG.warn("Could not find unacked tuple for {}", id);
- }
- }
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/filtering/JSONURLFilterWrapper.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/filtering/JSONURLFilterWrapper.java
deleted file mode 100644
index 068875ecf..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/filtering/JSONURLFilterWrapper.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.filtering;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.ByteArrayInputStream;
-import java.net.URL;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import org.apache.stormcrawler.JSONResource;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.filtering.URLFilter;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wraps a URLFilter whose resources are in a JSON file that can be stored in ES. The benefit of
- * doing this is that the resources can be refreshed automatically and modified without having to
- * recompile the jar and restart the topology. The connection to ES is done via the config and uses
- * a new bolt type 'config'.
- *
- * The configuration of the delegate is done in the urlfilters.json as usual.
- *
- *
- * {
- * "class": "org.apache.stormcrawler.elasticsearch.filtering.JSONURLFilterWrapper",
- * "name": "ESFastURLFilter",
- * "params": {
- * "refresh": "60",
- * "delegate": {
- * "class": "org.apache.stormcrawler.filtering.regex.FastURLFilter",
- * "params": {
- * "file": "fast.urlfilter.json"
- * }
- * }
- * }
- * }
- *
- *
- * The resource file can be pushed to ES with
- *
- *
- * curl -XPUT 'localhost:9200/config/config/fast.urlfilter.json?pretty' -H 'Content-Type: application/json' -d @fast.urlfilter.json
- *
- */
-public class JSONURLFilterWrapper extends URLFilter {
-
- private static final Logger LOG = LoggerFactory.getLogger(JSONURLFilterWrapper.class);
-
- private URLFilter delegatedURLFilter;
-
- public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) {
-
- String urlfilterclass = null;
-
- JsonNode delegateNode = filterParams.get("delegate");
- if (delegateNode == null) {
- throw new RuntimeException("delegateNode undefined!");
- }
-
- JsonNode node = delegateNode.get("class");
- if (node != null && node.isTextual()) {
- urlfilterclass = node.asText();
- }
-
- if (urlfilterclass == null) {
- throw new RuntimeException("urlfilter.class undefined!");
- }
-
- // load an instance of the delegated parsefilter
- try {
- Class> filterClass = Class.forName(urlfilterclass);
-
- boolean subClassOK = URLFilter.class.isAssignableFrom(filterClass);
- if (!subClassOK) {
- throw new RuntimeException(
- "Filter " + urlfilterclass + " does not extend URLFilter");
- }
-
- delegatedURLFilter = (URLFilter) filterClass.newInstance();
-
- // check that it implements JSONResource
- if (!JSONResource.class.isInstance(delegatedURLFilter)) {
- throw new RuntimeException(
- "Filter " + urlfilterclass + " does not implement JSONResource");
- }
-
- } catch (Exception e) {
- LOG.error("Can't setup {}: {}", urlfilterclass, e);
- throw new RuntimeException("Can't setup " + urlfilterclass, e);
- }
-
- // configure it
- node = delegateNode.get("params");
-
- delegatedURLFilter.configure(stormConf, node);
-
- int refreshRate = 600;
-
- node = filterParams.get("refresh");
- if (node != null && node.isInt()) {
- refreshRate = node.asInt(refreshRate);
- }
-
- final JSONResource resource = (JSONResource) delegatedURLFilter;
-
- new Timer()
- .schedule(
- new TimerTask() {
- private RestHighLevelClient esClient;
-
- public void run() {
- if (esClient == null) {
- try {
- esClient =
- ElasticSearchConnection.getClient(
- stormConf, "config");
- } catch (Exception e) {
- LOG.error("Exception while creating ES connection", e);
- }
- }
- if (esClient != null) {
- LOG.info("Reloading json resources from ES");
- try {
- GetResponse response =
- esClient.get(
- new GetRequest(
- "config",
- "config",
- resource.getResourceFile()),
- RequestOptions.DEFAULT);
- resource.loadJSONResources(
- new ByteArrayInputStream(
- response.getSourceAsBytes()));
- } catch (Exception e) {
- LOG.error("Can't load config from ES", e);
- }
- }
- }
- },
- 0,
- refreshRate * 1000);
- }
-
- @Override
- public @Nullable String filter(
- @Nullable URL sourceUrl,
- @Nullable Metadata sourceMetadata,
- @NotNull String urlToFilter) {
- return delegatedURLFilter.filter(sourceUrl, sourceMetadata, urlToFilter);
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/MetricsConsumer.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/MetricsConsumer.java
deleted file mode 100644
index 5f2270420..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/MetricsConsumer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.metrics;
-
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
-
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Sends metrics to an Elasticsearch index. The ES details are set in the configuration; an optional
- * argument sets a date format to append to the index name.
- *
- *
- * topology.metrics.consumer.register:
- * - class: "org.apache.stormcrawler.elasticsearch.metrics.MetricsConsumer"
- * parallelism.hint: 1
- * argument: "yyyy-MM-dd"
- *
- */
-public class MetricsConsumer implements IMetricsConsumer {
-
- private final Logger LOG = LoggerFactory.getLogger(getClass());
-
- private static final String ESBoltType = "metrics";
-
- /** name of the index to use for the metrics (default : metrics) * */
- private static final String ESMetricsIndexNameParamName = "es." + ESBoltType + ".index.name";
-
- private String indexName;
-
- private ElasticSearchConnection connection;
-
- private String stormID;
-
- /** optional date format passed as argument, must be parsable as a SimpleDateFormat */
- private SimpleDateFormat dateFormat;
-
- @Override
- public void prepare(
- Map stormConf,
- Object registrationArgument,
- TopologyContext context,
- IErrorReporter errorReporter) {
- indexName = ConfUtils.getString(stormConf, ESMetricsIndexNameParamName, "metrics");
- stormID = context.getStormId();
- if (registrationArgument != null) {
- dateFormat = new SimpleDateFormat((String) registrationArgument);
- LOG.info("Using date format {}", registrationArgument);
- }
- try {
- connection = ElasticSearchConnection.getConnection(stormConf, ESBoltType);
- } catch (Exception e1) {
- LOG.error("Can't connect to ElasticSearch", e1);
- throw new RuntimeException(e1);
- }
- }
-
- @Override
- public void cleanup() {
- if (connection != null) connection.close();
- }
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) {
- final Date now = new Date();
- for (DataPoint dataPoint : dataPoints) {
- handleDataPoints(taskInfo, dataPoint.name, dataPoint.value, now);
- }
- }
-
- private void handleDataPoints(
- final TaskInfo taskInfo, final String nameprefix, final Object value, final Date now) {
- if (value instanceof Number) {
- indexDataPoint(taskInfo, now, nameprefix, ((Number) value).doubleValue());
- } else if (value instanceof Map) {
- Iterator keyValiter = ((Map) value).entrySet().iterator();
- while (keyValiter.hasNext()) {
- Entry entry = keyValiter.next();
- String newnameprefix = nameprefix + "." + entry.getKey();
- handleDataPoints(taskInfo, newnameprefix, entry.getValue(), now);
- }
- } else if (value instanceof Collection) {
- for (Object collectionObj : (Collection) value) {
- handleDataPoints(taskInfo, nameprefix, collectionObj, now);
- }
- } else {
- LOG.warn("Found data point value {} of {}", nameprefix, value.getClass().toString());
- }
- }
-
- /**
- * Returns the name of the index that metrics will be written to.
- *
- * @return elastic index name
- */
- private String getIndexName(Date timestamp) {
- if (dateFormat == null) return indexName;
-
- StringBuilder sb = new StringBuilder(indexName);
- sb.append("-").append(dateFormat.format(timestamp));
- return sb.toString();
- }
-
- private void indexDataPoint(TaskInfo taskInfo, Date timestamp, String name, double value) {
- try {
- XContentBuilder builder = jsonBuilder().startObject();
- builder.field("stormId", stormID);
- builder.field("srcComponentId", taskInfo.srcComponentId);
- builder.field("srcTaskId", taskInfo.srcTaskId);
- builder.field("srcWorkerHost", taskInfo.srcWorkerHost);
- builder.field("srcWorkerPort", taskInfo.srcWorkerPort);
- builder.field("name", name);
- builder.field("value", value);
- builder.field("timestamp", timestamp);
- builder.endObject();
-
- IndexRequest indexRequest = new IndexRequest(getIndexName(timestamp)).source(builder);
- connection.addToProcessor(indexRequest);
- } catch (Exception e) {
- LOG.error("problem when building request for ES", e);
- }
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/StatusMetricsBolt.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/StatusMetricsBolt.java
deleted file mode 100644
index eeda63026..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/metrics/StatusMetricsBolt.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.metrics;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.core.CountRequest;
-import org.elasticsearch.client.core.CountResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Queries the status index periodically to get the count of URLs per status. This bolt can be
- * connected to the output of any other bolt and will not produce anything as output.
- */
-public class StatusMetricsBolt extends BaseRichBolt {
-
- private static final Logger LOG = LoggerFactory.getLogger(StatusMetricsBolt.class);
-
- private static final String ESBoltType = "status";
- private static final String ESStatusIndexNameParamName = "es.status.index.name";
-
- private String indexName;
-
- private ElasticSearchConnection connection;
-
- private Map latestStatusCounts = new HashMap<>(6);
-
- private int freqStats = 60;
-
- private OutputCollector _collector;
-
- private transient StatusActionListener[] listeners;
-
- private class StatusActionListener implements ActionListener {
-
- private final String name;
-
- private boolean ready = true;
-
- public boolean isReady() {
- return ready;
- }
-
- public void busy() {
- this.ready = false;
- }
-
- StatusActionListener(String statusName) {
- name = statusName;
- }
-
- @Override
- public void onResponse(CountResponse response) {
- ready = true;
- LOG.debug("Got {} counts for status:{}", response.getCount(), name);
- latestStatusCounts.put(name, response.getCount());
- }
-
- @Override
- public void onFailure(Exception e) {
- ready = true;
- LOG.error("Failure when getting counts for status:{}", name, e);
- }
- }
-
- @Override
- public void prepare(
- Map stormConf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- indexName = ConfUtils.getString(stormConf, ESStatusIndexNameParamName, "status");
- try {
- connection = ElasticSearchConnection.getConnection(stormConf, ESBoltType);
- } catch (Exception e1) {
- LOG.error("Can't connect to ElasticSearch", e1);
- throw new RuntimeException(e1);
- }
-
- context.registerMetric(
- "status.count",
- () -> {
- return latestStatusCounts;
- },
- freqStats);
-
- listeners = new StatusActionListener[6];
-
- listeners[0] = new StatusActionListener("DISCOVERED");
- listeners[1] = new StatusActionListener("FETCHED");
- listeners[2] = new StatusActionListener("FETCH_ERROR");
- listeners[3] = new StatusActionListener("REDIRECTION");
- listeners[4] = new StatusActionListener("ERROR");
- listeners[5] = new StatusActionListener("TOTAL");
- }
-
- @Override
- public Map getComponentConfiguration() {
- Config conf = new Config();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, freqStats);
- return conf;
- }
-
- @Override
- public void execute(Tuple input) {
- _collector.ack(input);
-
- // this bolt can be connected to anything
- // we just want to trigger a new search when the input is a tick tuple
- if (!TupleUtils.isTick(input)) {
- return;
- }
-
- for (StatusActionListener listener : listeners) {
- // still waiting for results from previous request
- if (!listener.isReady()) {
- LOG.debug("Not ready to get counts for status {}", listener.name);
- continue;
- }
- CountRequest request = new CountRequest(indexName);
- if (!listener.name.equalsIgnoreCase("TOTAL")) {
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(QueryBuilders.termQuery("status", listener.name));
- request.source(sourceBuilder);
- }
- listener.busy();
- connection.getClient().countAsync(request, RequestOptions.DEFAULT, listener);
- }
- }
-
- @Override
- public void cleanup() {
- connection.close();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // NONE - THIS BOLT DOES NOT GET CONNECTED TO ANY OTHERS
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/parse/filter/JSONResourceWrapper.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/parse/filter/JSONResourceWrapper.java
deleted file mode 100644
index 5b34830aa..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/parse/filter/JSONResourceWrapper.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.parse.filter;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.ByteArrayInputStream;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import org.apache.stormcrawler.JSONResource;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.parse.ParseFilter;
-import org.apache.stormcrawler.parse.ParseResult;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.DocumentFragment;
-
-/**
- * Wraps a ParseFilter whose resources are in a JSON file that can be stored in ES. The benefit of
- * doing this is that the resources can be refreshed automatically and modified without having to
- * recompile the jar and restart the topology. The connection to ES is done via the config and uses
- * a new bolt type 'config'.
- *
- * The configuration of the delegate is done in the parsefilters.json as usual.
- *
- *
- * {
- * "class": "org.apache.stormcrawler.elasticsearch.parse.filter.JSONResourceWrapper",
- * "name": "ESCollectionTagger",
- * "params": {
- * "refresh": "60",
- * "delegate": {
- * "class": "org.apache.stormcrawler.parse.filter.CollectionTagger",
- * "params": {
- * "file": "collections.json"
- * }
- * }
- * }
- * }
- *
- *
- * The resource file can be pushed to ES with
- *
- *
- * curl -XPUT "$ESHOST/config/_create/collections.json" -H 'Content-Type: application/json' -d @src/main/resources/collections.json
- *
- */
-public class JSONResourceWrapper extends ParseFilter {
-
- private static final Logger LOG = LoggerFactory.getLogger(JSONResourceWrapper.class);
-
- private ParseFilter delegatedParseFilter;
-
- public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) {
-
- String parsefilterclass = null;
-
- JsonNode delegateNode = filterParams.get("delegate");
- if (delegateNode == null) {
- throw new RuntimeException("delegateNode undefined!");
- }
-
- JsonNode node = delegateNode.get("class");
- if (node != null && node.isTextual()) {
- parsefilterclass = node.asText();
- }
-
- if (parsefilterclass == null) {
- throw new RuntimeException("parsefilter.class undefined!");
- }
-
- // load an instance of the delegated parsefilter
- try {
- Class> filterClass = Class.forName(parsefilterclass);
-
- boolean subClassOK = ParseFilter.class.isAssignableFrom(filterClass);
- if (!subClassOK) {
- throw new RuntimeException(
- "Filter " + parsefilterclass + " does not extend ParseFilter");
- }
-
- delegatedParseFilter = (ParseFilter) filterClass.newInstance();
-
- // check that it implements JSONResource
- if (!JSONResource.class.isInstance(delegatedParseFilter)) {
- throw new RuntimeException(
- "Filter " + parsefilterclass + " does not implement JSONResource");
- }
-
- } catch (Exception e) {
- LOG.error("Can't setup {}: {}", parsefilterclass, e);
- throw new RuntimeException("Can't setup " + parsefilterclass, e);
- }
-
- // configure it
- node = delegateNode.get("params");
-
- delegatedParseFilter.configure(stormConf, node);
-
- int refreshRate = 600;
-
- node = filterParams.get("refresh");
- if (node != null && node.isInt()) {
- refreshRate = node.asInt(refreshRate);
- }
-
- final JSONResource resource = (JSONResource) delegatedParseFilter;
-
- new Timer()
- .schedule(
- new TimerTask() {
- private RestHighLevelClient esClient;
-
- public void run() {
- if (esClient == null) {
- try {
- esClient =
- ElasticSearchConnection.getClient(
- stormConf, "config");
- } catch (Exception e) {
- LOG.error("Exception while creating ES connection", e);
- }
- }
- if (esClient != null) {
- LOG.info("Reloading json resources from ES");
- try {
- GetResponse response =
- esClient.get(
- new GetRequest(
- "config",
- resource.getResourceFile()),
- RequestOptions.DEFAULT);
- resource.loadJSONResources(
- new ByteArrayInputStream(
- response.getSourceAsBytes()));
- } catch (Exception e) {
- LOG.error("Can't load config from ES", e);
- }
- }
- }
- },
- 0,
- refreshRate * 1000);
- }
-
- @Override
- public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) {
- delegatedParseFilter.filter(URL, content, doc, parse);
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AbstractSpout.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AbstractSpout.java
deleted file mode 100644
index ec5b32c2e..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AbstractSpout.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.persistence.AbstractQueryingSpout;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractSpout extends AbstractQueryingSpout {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractSpout.class);
-
- protected static final String ESBoltType = "status";
- protected static final String ESStatusIndexNameParamName = "es.status.index.name";
-
- /** Field name to use for aggregating * */
- protected static final String ESStatusBucketFieldParamName = "es.status.bucket.field";
-
- protected static final String ESStatusMaxBucketParamName = "es.status.max.buckets";
- protected static final String ESStatusMaxURLsParamName = "es.status.max.urls.per.bucket";
-
- /** Field name to use for sorting the URLs within a bucket, not used if empty or null. */
- protected static final String ESStatusBucketSortFieldParamName = "es.status.bucket.sort.field";
-
- /** Field name to use for sorting the buckets, not used if empty or null. */
- protected static final String ESStatusGlobalSortFieldParamName = "es.status.global.sort.field";
-
- protected static final String ESStatusFilterParamName = "es.status.filterQuery";
-
- protected static final String ESStatusQueryTimeoutParamName = "es.status.query.timeout";
-
- /** Query to use as a positive filter, set by es.status.filterQuery */
- protected List filterQueries = null;
-
- protected String indexName;
-
- protected static RestHighLevelClient client;
-
- /**
- * when using multiple instances - each one is in charge of a specific shard useful when
- * sharding based on host or domain to guarantee a good mix of URLs
- */
- protected int shardID = -1;
-
- /** Used to distinguish between instances in the logs * */
- protected String logIdprefix = "";
-
- /** Field name used for field collapsing e.g. key * */
- protected String partitionField;
-
- protected int maxURLsPerBucket = 10;
-
- protected int maxBucketNum = 10;
-
- protected List bucketSortField = new LinkedList<>();
-
- protected String totalSortField = "";
-
- protected Date queryDate;
-
- protected int queryTimeout = -1;
-
- @Override
- public void open(
- Map stormConf,
- TopologyContext context,
- SpoutOutputCollector collector) {
-
- super.open(stormConf, context, collector);
-
- indexName = ConfUtils.getString(stormConf, ESStatusIndexNameParamName, "status");
-
- // one ES client per JVM
- synchronized (AbstractSpout.class) {
- try {
- if (client == null) {
- client = ElasticSearchConnection.getClient(stormConf, ESBoltType);
- }
- } catch (Exception e1) {
- LOG.error("Can't connect to ElasticSearch", e1);
- throw new RuntimeException(e1);
- }
- }
-
- // if more than one instance is used we expect their number to be the
- // same as the number of shards
- int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
- if (totalTasks > 1) {
- logIdprefix =
- "[" + context.getThisComponentId() + " #" + context.getThisTaskIndex() + "] ";
-
- // determine the number of shards so that we can restrict the
- // search
-
- // TODO use the admin API when it gets available
- // TODO or the low level one with
- // https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shards-stores.html
- // TODO identify local shards and use those if possible
-
- // ClusterSearchShardsRequest request = new
- // ClusterSearchShardsRequest(
- // indexName);
- // ClusterSearchShardsResponse shardresponse = client.admin()
- // .cluster().searchShards(request).actionGet();
- // ClusterSearchShardsGroup[] shardgroups =
- // shardresponse.getGroups();
- // if (totalTasks != shardgroups.length) {
- // throw new RuntimeException(
- // "Number of ES spout instances should be the same as number of
- // shards ("
- // + shardgroups.length + ") but is " + totalTasks);
- // }
- // shardID = shardgroups[context.getThisTaskIndex()].getShardId()
- // .getId();
-
- // TEMPORARY simply use the task index as shard index
- shardID = context.getThisTaskIndex();
- LOG.info("{} assigned shard ID {}", logIdprefix, shardID);
- }
-
- partitionField = ConfUtils.getString(stormConf, ESStatusBucketFieldParamName, "key");
-
- bucketSortField = ConfUtils.loadListFromConf(ESStatusBucketSortFieldParamName, stormConf);
-
- totalSortField = ConfUtils.getString(stormConf, ESStatusGlobalSortFieldParamName);
-
- maxURLsPerBucket = ConfUtils.getInt(stormConf, ESStatusMaxURLsParamName, 1);
- maxBucketNum = ConfUtils.getInt(stormConf, ESStatusMaxBucketParamName, 10);
-
- queryTimeout = ConfUtils.getInt(stormConf, ESStatusQueryTimeoutParamName, -1);
-
- filterQueries = ConfUtils.loadListFromConf(ESStatusFilterParamName, stormConf);
- }
-
- /** Builds a query and use it retrieve the results from ES * */
- protected abstract void populateBuffer();
-
- protected final boolean addHitToBuffer(SearchHit hit) {
- Map keyValues = hit.getSourceAsMap();
- String url = (String) keyValues.get("url");
- // is already being processed - skip it!
- if (beingProcessed.containsKey(url)) {
- return false;
- }
- Metadata metadata = fromKeyValues(keyValues);
- addHitInfoToMetadata(metadata, hit);
-
- return buffer.add(url, metadata);
- }
-
- protected void addHitInfoToMetadata(Metadata metadata, SearchHit hit) {}
-
- protected final Metadata fromKeyValues(Map keyValues) {
- Map> mdAsMap = (Map>) keyValues.get("metadata");
- Metadata metadata = new Metadata();
- if (mdAsMap != null) {
- Iterator>> mdIter = mdAsMap.entrySet().iterator();
- while (mdIter.hasNext()) {
- Entry> mdEntry = mdIter.next();
- String key = mdEntry.getKey();
- // periods are not allowed in ES2 - replace with %2E
- key = key.replaceAll("%2E", "\\.");
- Object mdValObj = mdEntry.getValue();
- // single value
- if (mdValObj instanceof String) {
- metadata.addValue(key, (String) mdValObj);
- }
- // multi valued
- else {
- metadata.addValues(key, (List) mdValObj);
- }
- }
- }
- return metadata;
- }
-
- @Override
- public void ack(Object msgId) {
- LOG.debug("{} Ack for {}", logIdprefix, msgId);
- super.ack(msgId);
- }
-
- @Override
- public void fail(Object msgId) {
- LOG.info("{} Fail for {}", logIdprefix, msgId);
- super.fail(msgId);
- }
-
- @Override
- public void close() {
- if (client != null)
- try {
- client.close();
- } catch (IOException e) {
- }
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AggregationSpout.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AggregationSpout.java
deleted file mode 100644
index 0e1f69dae..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/AggregationSpout.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-
-import java.time.Instant;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.BucketOrder;
-import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
-import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.TopHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortBuilders;
-import org.elasticsearch.search.sort.SortOrder;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spout which pulls URL from an ES index. Use a single instance unless you use 'es.status.routing'
- * with the StatusUpdaterBolt, in which case you need to have exactly the same number of spout
- * instances as ES shards. Guarantees a good mix of URLs by aggregating them by an arbitrary field
- * e.g. key.
- */
-public class AggregationSpout extends AbstractSpout implements ActionListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(AggregationSpout.class);
-
- private static final String ESStatusSampleParamName = "es.status.sample";
- private static final String ESMostRecentDateIncreaseParamName = "es.status.recentDate.increase";
- private static final String ESMostRecentDateMinGapParamName = "es.status.recentDate.min.gap";
-
- private boolean sample = false;
-
- private int recentDateIncrease = -1;
- private int recentDateMinGap = -1;
-
- protected Set currentBuckets;
-
- @Override
- public void open(
- Map stormConf,
- TopologyContext context,
- SpoutOutputCollector collector) {
- sample = ConfUtils.getBoolean(stormConf, ESStatusSampleParamName, sample);
- recentDateIncrease =
- ConfUtils.getInt(stormConf, ESMostRecentDateIncreaseParamName, recentDateIncrease);
- recentDateMinGap =
- ConfUtils.getInt(stormConf, ESMostRecentDateMinGapParamName, recentDateMinGap);
- super.open(stormConf, context, collector);
- currentBuckets = new HashSet<>();
- }
-
- @Override
- protected void populateBuffer() {
-
- if (queryDate == null) {
- queryDate = new Date();
- lastTimeResetToNOW = Instant.now();
- }
-
- String formattedQueryDate = ISODateTimeFormat.dateTimeNoMillis().print(queryDate.getTime());
-
- LOG.info("{} Populating buffer with nextFetchDate <= {}", logIdprefix, formattedQueryDate);
-
- BoolQueryBuilder queryBuilder =
- boolQuery()
- .filter(QueryBuilders.rangeQuery("nextFetchDate").lte(formattedQueryDate));
-
- if (filterQueries != null) {
- for (String filterQuery : filterQueries) {
- queryBuilder.filter(QueryBuilders.queryStringQuery(filterQuery));
- }
- }
-
- SearchRequest request = new SearchRequest(indexName);
-
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(queryBuilder);
- sourceBuilder.from(0);
- sourceBuilder.size(0);
- sourceBuilder.explain(false);
- sourceBuilder.trackTotalHits(false);
-
- if (queryTimeout != -1) {
- sourceBuilder.timeout(new TimeValue(queryTimeout, TimeUnit.SECONDS));
- }
-
- TermsAggregationBuilder aggregations =
- AggregationBuilders.terms("partition").field(partitionField).size(maxBucketNum);
-
- org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder tophits =
- AggregationBuilders.topHits("docs").size(maxURLsPerBucket).explain(false);
-
- // sort within a bucket
- for (String bsf : bucketSortField) {
- FieldSortBuilder sorter = SortBuilders.fieldSort(bsf).order(SortOrder.ASC);
- tophits.sort(sorter);
- }
-
- aggregations.subAggregation(tophits);
-
- // sort between buckets
- if (StringUtils.isNotBlank(totalSortField)) {
- org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder minBuilder =
- AggregationBuilders.min("top_hit").field(totalSortField);
- aggregations.subAggregation(minBuilder);
- aggregations.order(BucketOrder.aggregation("top_hit", true));
- }
-
- if (sample) {
- DiversifiedAggregationBuilder sab = new DiversifiedAggregationBuilder("sample");
- sab.field(partitionField).maxDocsPerValue(maxURLsPerBucket);
- sab.shardSize(maxURLsPerBucket * maxBucketNum);
- sab.subAggregation(aggregations);
- sourceBuilder.aggregation(sab);
- } else {
- sourceBuilder.aggregation(aggregations);
- }
-
- request.source(sourceBuilder);
-
- // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html
- // _shards:2,3
- // specific shard but ideally a local copy of it
- if (shardID != -1) {
- request.preference("_shards:" + shardID + "|_local");
- }
-
- // dump query to log
- LOG.debug("{} ES query {}", logIdprefix, request);
-
- LOG.trace("{} isInquery set to true");
- isInQuery.set(true);
- client.searchAsync(request, RequestOptions.DEFAULT, this);
- }
-
- @Override
- public void onFailure(Exception arg0) {
- LOG.error("{} Exception with ES query", logIdprefix, arg0);
- markQueryReceivedNow();
- }
-
- @Override
- public void onResponse(SearchResponse response) {
- long timeTaken = System.currentTimeMillis() - getTimeLastQuerySent();
-
- Aggregations aggregs = response.getAggregations();
-
- if (aggregs == null) {
- markQueryReceivedNow();
- return;
- }
-
- SingleBucketAggregation sample = aggregs.get("sample");
- if (sample != null) {
- aggregs = sample.getAggregations();
- }
-
- Terms agg = aggregs.get("partition");
-
- int numhits = 0;
- int numBuckets = 0;
- int alreadyprocessed = 0;
-
- Instant mostRecentDateFound = null;
-
- currentBuckets.clear();
-
- // For each entry
- Iterator iterator = (Iterator) agg.getBuckets().iterator();
- while (iterator.hasNext()) {
- Terms.Bucket entry = iterator.next();
- String key = (String) entry.getKey(); // bucket key
-
- currentBuckets.add(key);
-
- long docCount = entry.getDocCount(); // Doc count
-
- int hitsForThisBucket = 0;
-
- SearchHit lastHit = null;
-
- // filter results so that we don't include URLs we are already
- // being processed
- TopHits topHits = entry.getAggregations().get("docs");
- for (SearchHit hit : topHits.getHits().getHits()) {
-
- LOG.debug(
- "{} -> id [{}], _source [{}]",
- logIdprefix,
- hit.getId(),
- hit.getSourceAsString());
-
- hitsForThisBucket++;
-
- lastHit = hit;
-
- Map keyValues = hit.getSourceAsMap();
- String url = (String) keyValues.get("url");
-
- // consider only the first document of the last bucket
- // for optimising the nextFetchDate
- if (hitsForThisBucket == 1 && !iterator.hasNext()) {
- String strDate = (String) keyValues.get("nextFetchDate");
- try {
- mostRecentDateFound = Instant.parse(strDate);
- } catch (Exception e) {
- throw new RuntimeException("can't parse date :" + strDate);
- }
- }
-
- // is already being processed or in buffer - skip it!
- if (beingProcessed.containsKey(url)) {
- LOG.debug("{} -> already processed: {}", logIdprefix, url);
- alreadyprocessed++;
- continue;
- }
-
- Metadata metadata = fromKeyValues(keyValues);
- boolean added = buffer.add(url, metadata);
- if (!added) {
- LOG.debug("{} -> already in buffer: {}", logIdprefix, url);
- alreadyprocessed++;
- continue;
- }
- LOG.debug("{} -> added to buffer : {}", logIdprefix, url);
- }
-
- if (lastHit != null) {
- sortValuesForKey(key, lastHit.getSortValues());
- }
-
- if (hitsForThisBucket > 0) numBuckets++;
-
- numhits += hitsForThisBucket;
-
- LOG.debug(
- "{} key [{}], hits[{}], doc_count [{}]",
- logIdprefix,
- key,
- hitsForThisBucket,
- docCount,
- alreadyprocessed);
- }
-
- LOG.info(
- "{} ES query returned {} hits from {} buckets in {} msec with {} already being processed. Took {} msec per doc on average.",
- logIdprefix,
- numhits,
- numBuckets,
- timeTaken,
- alreadyprocessed,
- ((float) timeTaken / numhits));
-
- queryTimes.addMeasurement(timeTaken);
- eventCounter.scope("already_being_processed").incrBy(alreadyprocessed);
- eventCounter.scope("ES_queries").incrBy(1);
- eventCounter.scope("ES_docs").incrBy(numhits);
-
- // optimise the nextFetchDate by getting the most recent value
- // returned in the query and add to it, unless the previous value is
- // within n mins in which case we'll keep it
- if (mostRecentDateFound != null && recentDateIncrease >= 0) {
- Calendar potentialNewDate = Calendar.getInstance();
- potentialNewDate.setTimeInMillis(mostRecentDateFound.toEpochMilli());
- potentialNewDate.add(Calendar.MINUTE, recentDateIncrease);
- Date oldDate = null;
- // check boundaries
- if (this.recentDateMinGap > 0) {
- Calendar low = Calendar.getInstance();
- low.setTime(queryDate);
- low.add(Calendar.MINUTE, -recentDateMinGap);
- Calendar high = Calendar.getInstance();
- high.setTime(queryDate);
- high.add(Calendar.MINUTE, recentDateMinGap);
- if (high.before(potentialNewDate) || low.after(potentialNewDate)) {
- oldDate = queryDate;
- }
- } else {
- oldDate = queryDate;
- }
- if (oldDate != null) {
- queryDate = potentialNewDate.getTime();
- LOG.info(
- "{} queryDate changed from {} to {} based on mostRecentDateFound {}",
- logIdprefix,
- oldDate,
- queryDate,
- mostRecentDateFound);
- } else {
- LOG.info(
- "{} queryDate kept at {} based on mostRecentDateFound {}",
- logIdprefix,
- queryDate,
- mostRecentDateFound);
- }
- }
-
- // reset the value for next fetch date if the previous one is too old
- if (resetFetchDateAfterNSecs != -1) {
- Instant changeNeededOn =
- Instant.ofEpochMilli(
- lastTimeResetToNOW.toEpochMilli() + (resetFetchDateAfterNSecs * 1000));
- if (Instant.now().isAfter(changeNeededOn)) {
- LOG.info(
- "{} queryDate set to null based on resetFetchDateAfterNSecs {}",
- logIdprefix,
- resetFetchDateAfterNSecs);
- queryDate = null;
- }
- }
-
- // change the date if we don't get any results at all
- if (numBuckets == 0) {
- queryDate = null;
- }
-
- // remove lock
- markQueryReceivedNow();
- }
-
- protected void sortValuesForKey(String key, Object[] sortValues) {}
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/CollapsingSpout.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/CollapsingSpout.java
deleted file mode 100644
index badcf7aa2..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/CollapsingSpout.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-
-import java.time.Instant;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.InnerHitBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.collapse.CollapseBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortBuilder;
-import org.elasticsearch.search.sort.SortBuilders;
-import org.elasticsearch.search.sort.SortOrder;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spout which pulls URL from an ES index. Use a single instance unless you use 'es.status.routing'
- * with the StatusUpdaterBolt, in which case you need to have exactly the same number of spout
- * instances as ES shards. Collapses results to implement politeness and ensure a good diversity of
- * sources.
- */
-public class CollapsingSpout extends AbstractSpout implements ActionListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(CollapsingSpout.class);
-
- /** Used to avoid deep paging * */
- private static final String ESMaxStartOffsetParamName = "es.status.max.start.offset";
-
- private int lastStartOffset = 0;
- private int maxStartOffset = -1;
-
- @Override
- public void open(
- Map stormConf,
- TopologyContext context,
- SpoutOutputCollector collector) {
- maxStartOffset = ConfUtils.getInt(stormConf, ESMaxStartOffsetParamName, -1);
- super.open(stormConf, context, collector);
- }
-
- @Override
- protected void populateBuffer() {
- // not used yet or returned empty results
- if (queryDate == null) {
- queryDate = new Date();
- lastTimeResetToNOW = Instant.now();
- lastStartOffset = 0;
- }
- // been running same query for too long and paging deep?
- else if (maxStartOffset != -1 && lastStartOffset > maxStartOffset) {
- LOG.info("Reached max start offset {}", lastStartOffset);
- lastStartOffset = 0;
- }
-
- String formattedLastDate = ISODateTimeFormat.dateTimeNoMillis().print(queryDate.getTime());
-
- LOG.info("{} Populating buffer with nextFetchDate <= {}", logIdprefix, formattedLastDate);
-
- BoolQueryBuilder queryBuilder =
- boolQuery()
- .filter(QueryBuilders.rangeQuery("nextFetchDate").lte(formattedLastDate));
-
- if (filterQueries != null) {
- for (String filterQuery : filterQueries) {
- queryBuilder.filter(QueryBuilders.queryStringQuery(filterQuery));
- }
- }
-
- SearchRequest request = new SearchRequest(indexName);
-
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(queryBuilder);
- sourceBuilder.from(lastStartOffset);
- sourceBuilder.size(maxBucketNum);
- sourceBuilder.explain(false);
- sourceBuilder.trackTotalHits(false);
-
- // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html
- // _shards:2,3
- // specific shard but ideally a local copy of it
- if (shardID != -1) {
- request.preference("_shards:" + shardID + "|_local");
- }
-
- if (queryTimeout != -1) {
- sourceBuilder.timeout(new TimeValue(queryTimeout, TimeUnit.SECONDS));
- }
-
- if (StringUtils.isNotBlank(totalSortField)) {
- sourceBuilder.sort(new FieldSortBuilder(totalSortField).order(SortOrder.ASC));
- }
-
- CollapseBuilder collapse = new CollapseBuilder(partitionField);
-
- // group expansion -> sends sub queries for each bucket
- if (maxURLsPerBucket > 1) {
- InnerHitBuilder ihb = new InnerHitBuilder();
- ihb.setSize(maxURLsPerBucket);
- ihb.setName("urls_per_bucket");
- List> sorts = new LinkedList<>();
- // sort within a bucket
- for (String bsf : bucketSortField) {
- FieldSortBuilder bucketsorter = SortBuilders.fieldSort(bsf).order(SortOrder.ASC);
- sorts.add(bucketsorter);
- }
- if (!sorts.isEmpty()) {
- ihb.setSorts(sorts);
- }
- collapse.setInnerHits(ihb);
- }
-
- sourceBuilder.collapse(collapse);
-
- request.source(sourceBuilder);
-
- // dump query to log
- LOG.debug("{} ES query {}", logIdprefix, request.toString());
-
- isInQuery.set(true);
- client.searchAsync(request, RequestOptions.DEFAULT, this);
- }
-
- @Override
- public void onFailure(Exception e) {
- LOG.error("{} Exception with ES query", logIdprefix, e);
- markQueryReceivedNow();
- }
-
- @Override
- public void onResponse(SearchResponse response) {
- long timeTaken = System.currentTimeMillis() - getTimeLastQuerySent();
-
- SearchHit[] hits = response.getHits().getHits();
- int numBuckets = hits.length;
-
- int alreadyprocessed = 0;
- int numDocs = 0;
-
- for (SearchHit hit : hits) {
- Map innerHits = hit.getInnerHits();
- // wanted just one per bucket : no inner hits
- if (innerHits == null) {
- numDocs++;
- if (!addHitToBuffer(hit)) {
- alreadyprocessed++;
- }
- continue;
- }
- // more than one per bucket
- SearchHits inMyBucket = innerHits.get("urls_per_bucket");
- for (SearchHit subHit : inMyBucket.getHits()) {
- numDocs++;
- if (!addHitToBuffer(subHit)) {
- alreadyprocessed++;
- }
- }
- }
-
- queryTimes.addMeasurement(timeTaken);
- // could be derived from the count of query times above
- eventCounter.scope("ES_queries").incrBy(1);
- eventCounter.scope("ES_docs").incrBy(numDocs);
- eventCounter.scope("already_being_processed").incrBy(alreadyprocessed);
-
- LOG.info(
- "{} ES query returned {} hits from {} buckets in {} msec with {} already being processed.Took {} msec per doc on average.",
- logIdprefix,
- numDocs,
- numBuckets,
- timeTaken,
- alreadyprocessed,
- ((float) timeTaken / numDocs));
-
- // reset the value for next fetch date if the previous one is too old
- if (resetFetchDateAfterNSecs != -1) {
- Instant changeNeededOn =
- Instant.ofEpochMilli(
- lastTimeResetToNOW.toEpochMilli() + (resetFetchDateAfterNSecs * 1000));
- if (Instant.now().isAfter(changeNeededOn)) {
- LOG.info(
- "queryDate reset based on resetFetchDateAfterNSecs {}",
- resetFetchDateAfterNSecs);
- queryDate = null;
- lastStartOffset = 0;
- }
- }
-
- // no more results?
- if (numBuckets == 0) {
- queryDate = null;
- lastStartOffset = 0;
- }
- // still got some results but paging won't help
- else if (numBuckets < maxBucketNum) {
- lastStartOffset = 0;
- } else {
- lastStartOffset += numBuckets;
- }
-
- // remove lock
- markQueryReceivedNow();
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/HybridSpout.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/HybridSpout.java
deleted file mode 100644
index 8e16a5337..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/HybridSpout.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.time.Instant;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.stormcrawler.persistence.EmptyQueueListener;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortBuilders;
-import org.elasticsearch.search.sort.SortOrder;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses collapsing spouts to get an initial set of URLs and keys to query for and gets emptyQueue
- * notifications from the URLBuffer to query ES for a specific key.
- *
- * @since 1.15
- */
-public class HybridSpout extends AggregationSpout implements EmptyQueueListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(HybridSpout.class);
-
- protected static final String RELOADPARAMNAME = "es.status.max.urls.per.reload";
-
- private int bufferReloadSize = 10;
-
- private Cache searchAfterCache;
-
- private HostResultListener hrl;
-
- @Override
- public void open(
- Map stormConf,
- TopologyContext context,
- SpoutOutputCollector collector) {
- super.open(stormConf, context, collector);
- bufferReloadSize = ConfUtils.getInt(stormConf, RELOADPARAMNAME, maxURLsPerBucket);
- buffer.setEmptyQueueListener(this);
- searchAfterCache = Caffeine.newBuilder().build();
- hrl = new HostResultListener();
- }
-
- @Override
- public void emptyQueue(String queueName) {
-
- LOG.info("{} Emptied buffer queue for {}", logIdprefix, queueName);
-
- if (!currentBuckets.contains(queueName)) {
- // not interested in this one any more
- return;
- }
-
- // reloading the aggregs - searching now
- // would just overload ES and yield
- // mainly duplicates
- if (isInQuery.get()) {
- LOG.trace("{} isInquery true", logIdprefix, queueName);
- return;
- }
-
- LOG.info("{} Querying for more docs for {}", logIdprefix, queueName);
-
- if (queryDate == null) {
- queryDate = new Date();
- lastTimeResetToNOW = Instant.now();
- }
-
- String formattedQueryDate = ISODateTimeFormat.dateTimeNoMillis().print(queryDate.getTime());
-
- BoolQueryBuilder queryBuilder =
- boolQuery()
- .filter(QueryBuilders.rangeQuery("nextFetchDate").lte(formattedQueryDate));
-
- queryBuilder.filter(QueryBuilders.termQuery(partitionField, queueName));
-
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(queryBuilder);
- sourceBuilder.from(0);
- sourceBuilder.size(bufferReloadSize);
- sourceBuilder.explain(false);
- sourceBuilder.trackTotalHits(false);
-
- // sort within a bucket
- for (String bsf : bucketSortField) {
- FieldSortBuilder sorter = SortBuilders.fieldSort(bsf).order(SortOrder.ASC);
- sourceBuilder.sort(sorter);
- }
-
- // do we have a search after for this one?
- Object[] searchAfterValues = searchAfterCache.getIfPresent(queueName);
- if (searchAfterValues != null) {
- sourceBuilder.searchAfter(searchAfterValues);
- }
-
- SearchRequest request = new SearchRequest(indexName);
-
- request.source(sourceBuilder);
-
- // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html
- // _shards:2,3
- // specific shard but ideally a local copy of it
- if (shardID != -1) {
- request.preference("_shards:" + shardID + "|_local");
- }
-
- // dump query to log
- LOG.debug("{} ES query {} - {}", logIdprefix, queueName, request.toString());
-
- client.searchAsync(request, RequestOptions.DEFAULT, hrl);
- }
-
- @Override
- /** Overrides the handling of responses for aggregations */
- public void onResponse(SearchResponse response) {
- // delete all entries from the searchAfterCache when
- // we get the results from the aggregation spouts
- searchAfterCache.invalidateAll();
- super.onResponse(response);
- }
-
- @Override
- /** The aggregation kindly told us where to start from * */
- protected void sortValuesForKey(String key, Object[] sortValues) {
- if (sortValues != null && sortValues.length > 0) this.searchAfterCache.put(key, sortValues);
- }
-
- /** Handling of results for a specific queue * */
- class HostResultListener implements ActionListener {
-
- @Override
- public void onResponse(SearchResponse response) {
-
- int alreadyprocessed = 0;
- int numDocs = 0;
-
- SearchHit[] hits = response.getHits().getHits();
-
- Object[] sortValues = null;
-
- // retrieve the key for these results
- String key = null;
-
- for (SearchHit hit : hits) {
- numDocs++;
- String pfield = partitionField;
- Map sourceAsMap = hit.getSourceAsMap();
- if (pfield.startsWith("metadata.")) {
- sourceAsMap = (Map) sourceAsMap.get("metadata");
- pfield = pfield.substring(9);
- }
- Object key_as_object = sourceAsMap.get(pfield);
- if (key_as_object instanceof List) {
- if (((List) (key_as_object)).size() == 1)
- key = (String) ((List) key_as_object).get(0);
- } else {
- key = key_as_object.toString();
- }
-
- sortValues = hit.getSortValues();
- if (!addHitToBuffer(hit)) {
- alreadyprocessed++;
- }
- }
-
- // no key if no results have been found
- if (key != null) {
- searchAfterCache.put(key, sortValues);
- }
-
- eventCounter.scope("ES_queries_host").incrBy(1);
- eventCounter.scope("ES_docs_host").incrBy(numDocs);
- eventCounter.scope("already_being_processed_host").incrBy(alreadyprocessed);
-
- LOG.info(
- "{} ES term query returned {} hits in {} msec with {} already being processed for {}",
- logIdprefix,
- numDocs,
- response.getTook().getMillis(),
- alreadyprocessed,
- key);
- }
-
- @Override
- public void onFailure(Exception e) {
- LOG.error("Exception with ES query", e);
- }
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/ScrollSpout.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/ScrollSpout.java
deleted file mode 100644
index 1f44a01f7..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/ScrollSpout.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.stormcrawler.Constants;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt;
-import org.apache.stormcrawler.persistence.Status;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Reads all the documents from a shard and emits them on the status stream. Used for copying an
- * index.
- */
-public class ScrollSpout extends AbstractSpout implements ActionListener {
-
- private String scrollId = null;
- private boolean hasFinished = false;
-
- private Queue queue = new LinkedList<>();
-
- private static final Logger LOG = LoggerFactory.getLogger(ScrollSpout.class);
-
- @Override
- // simplified version of the super method so that we can store the fields in
- // the
- // map of things being processed
- public void nextTuple() {
- synchronized (queue) {
- if (!queue.isEmpty()) {
- List fields = queue.remove();
- String url = fields.get(0).toString();
- _collector.emit(Constants.StatusStreamName, fields, url);
- beingProcessed.put(url, fields);
- eventCounter.scope("emitted").incrBy(1);
- LOG.debug("{} emitted {}", logIdprefix, url);
- return;
- }
- }
-
- if (isInQuery.get()) {
- LOG.trace("{} isInquery true", logIdprefix);
- // sleep for a bit but not too much in order to give ack/fail a
- // chance
- Utils.sleep(10);
- return;
- }
-
- // re-populate the buffer
- populateBuffer();
- }
-
- @Override
- protected void populateBuffer() {
- if (hasFinished) {
- Utils.sleep(10);
- return;
- }
-
- // initial request
- if (scrollId == null) {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(QueryBuilders.matchAllQuery());
- searchSourceBuilder.size(maxURLsPerBucket * maxBucketNum);
- SearchRequest searchRequest = new SearchRequest(indexName);
- searchRequest.source(searchSourceBuilder);
- searchRequest.scroll(TimeValue.timeValueMinutes(5L));
-
- // specific shard but ideally a local copy of it
- if (shardID != -1) {
- searchRequest.preference("_shards:" + shardID + "|_local");
- }
-
- isInQuery.set(true);
- LOG.trace("{} isInquery set to true", logIdprefix);
-
- client.searchAsync(searchRequest, RequestOptions.DEFAULT, this);
-
- // dump query to log
- LOG.debug("{} ES query {}", logIdprefix, searchRequest.toString());
- return;
- }
-
- SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
- scrollRequest.scroll(TimeValue.timeValueMinutes(5L));
-
- isInQuery.set(true);
- client.scrollAsync(scrollRequest, RequestOptions.DEFAULT, this);
- // dump query to log
- LOG.debug("{} ES query {}", logIdprefix, scrollRequest.toString());
- }
-
- @Override
- public void onResponse(SearchResponse response) {
- SearchHits hits = response.getHits();
- LOG.info(
- "{} ES query returned {} hits in {} msec",
- logIdprefix,
- hits.getHits().length,
- response.getTook().getMillis());
- hasFinished = hits.getHits().length == 0;
- synchronized (this.queue) {
- // Unlike standard spouts, the scroll queries should never return
- // the same
- // document twice -> no need to look in the buffer or cache
- for (SearchHit hit : hits) {
- Map keyValues = hit.getSourceAsMap();
- String url = (String) keyValues.get("url");
- String status = (String) keyValues.get("status");
- String nextFetchDate = (String) keyValues.get("nextFetchDate");
- Metadata metadata = fromKeyValues(keyValues);
- metadata.setValue(
- AbstractStatusUpdaterBolt.AS_IS_NEXTFETCHDATE_METADATA, nextFetchDate);
- this.queue.add(new Values(url, metadata, Status.valueOf(status)));
- }
- }
- scrollId = response.getScrollId();
- // remove lock
- markQueryReceivedNow();
- }
-
- @Override
- public void onFailure(Exception e) {
- LOG.error("{} Exception with ES query", logIdprefix, e);
- markQueryReceivedNow();
- }
-
- @Override
- public void fail(Object msgId) {
- LOG.info("{} Fail for {}", logIdprefix, msgId);
- eventCounter.scope("failed").incrBy(1);
- // retrieve the values from being processed and send them back to the
- // queue
- Values v = (Values) beingProcessed.remove(msgId);
- queue.add(v);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(Constants.StatusStreamName, new Fields("url", "metadata", "status"));
- }
-}
diff --git a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/StatusUpdaterBolt.java b/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/StatusUpdaterBolt.java
deleted file mode 100644
index 3e10348c5..000000000
--- a/external/elasticsearch/src/main/java/org/apache/stormcrawler/elasticsearch/persistence/StatusUpdaterBolt.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.persistence;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalCause;
-import com.github.benmanes.caffeine.cache.RemovalListener;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.metric.api.MultiCountMetric;
-import org.apache.storm.metric.api.MultiReducedMetric;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.elasticsearch.BulkItemResponseToFailedFlag;
-import org.apache.stormcrawler.elasticsearch.ElasticSearchConnection;
-import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt;
-import org.apache.stormcrawler.persistence.Status;
-import org.apache.stormcrawler.util.ConfUtils;
-import org.apache.stormcrawler.util.PerSecondReducer;
-import org.apache.stormcrawler.util.URLPartitioner;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple bolt which stores the status of URLs into ElasticSearch. Takes the tuples coming from the
- * 'status' stream. To be used in combination with a Spout to read from the index.
- */
-public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt
- implements RemovalListener>, BulkProcessor.Listener {
-
- private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class);
-
- private String ESBoltType = "status";
-
- private static final String ESStatusIndexNameParamName = "es.%s.index.name";
- private static final String ESStatusRoutingParamName = "es.%s.routing";
- private static final String ESStatusRoutingFieldParamName = "es.%s.routing.fieldname";
-
- private boolean routingFieldNameInMetadata = false;
-
- private String indexName;
-
- private URLPartitioner partitioner;
-
- /** whether to apply the same partitioning logic used for politeness for routing, e.g byHost */
- private boolean doRouting;
-
- /** Store the key used for routing explicitly as a field in metadata * */
- private String fieldNameForRoutingKey = null;
-
- private ElasticSearchConnection connection;
-
- private Cache> waitAck;
-
- // Be fair due to cache timeout
- private final ReentrantLock waitAckLock = new ReentrantLock(true);
-
- private MultiCountMetric eventCounter;
-
- private MultiReducedMetric receivedPerSecMetrics;
-
- public StatusUpdaterBolt() {
- super();
- }
-
- /**
- * Loads the configuration using a substring different from the default value 'status' in order
- * to distinguish it from the spout configurations
- */
- public StatusUpdaterBolt(String boltType) {
- super();
- ESBoltType = boltType;
- }
-
- @Override
- public void prepare(
- Map stormConf, TopologyContext context, OutputCollector collector) {
-
- super.prepare(stormConf, context, collector);
-
- indexName =
- ConfUtils.getString(
- stormConf,
- String.format(StatusUpdaterBolt.ESStatusIndexNameParamName, ESBoltType),
- "status");
-
- doRouting =
- ConfUtils.getBoolean(
- stormConf,
- String.format(StatusUpdaterBolt.ESStatusRoutingParamName, ESBoltType),
- false);
-
- partitioner = new URLPartitioner();
- partitioner.configure(stormConf);
-
- fieldNameForRoutingKey =
- ConfUtils.getString(
- stormConf,
- String.format(StatusUpdaterBolt.ESStatusRoutingFieldParamName, ESBoltType));
- if (StringUtils.isNotBlank(fieldNameForRoutingKey)) {
- if (fieldNameForRoutingKey.startsWith("metadata.")) {
- routingFieldNameInMetadata = true;
- fieldNameForRoutingKey = fieldNameForRoutingKey.substring("metadata.".length());
- }
- // periods are not allowed in ES2 - replace with %2E
- fieldNameForRoutingKey = fieldNameForRoutingKey.replaceAll("\\.", "%2E");
- }
-
- waitAck =
- Caffeine.newBuilder()
- .expireAfterWrite(60, TimeUnit.SECONDS)
- .removalListener(this)
- .build();
-
- int metrics_time_bucket_secs = 30;
-
- // create gauge for waitAck
- context.registerMetric("waitAck", () -> waitAck.estimatedSize(), metrics_time_bucket_secs);
-
- // benchmarking - average number of items received back by Elastic per second
- this.receivedPerSecMetrics =
- context.registerMetric(
- "average_persec",
- new MultiReducedMetric(new PerSecondReducer()),
- metrics_time_bucket_secs);
-
- this.eventCounter =
- context.registerMetric(
- "counters", new MultiCountMetric(), metrics_time_bucket_secs);
-
- try {
- connection = ElasticSearchConnection.getConnection(stormConf, ESBoltType, this);
- } catch (Exception e1) {
- LOG.error("Can't connect to ElasticSearch", e1);
- throw new RuntimeException(e1);
- }
- }
-
- @Override
- public void cleanup() {
- if (connection == null) {
- return;
- }
- connection.close();
- connection = null;
- }
-
- @Override
- public void store(
- String url, Status status, Metadata metadata, Optional nextFetch, Tuple tuple)
- throws Exception {
-
- String documentID = getDocumentID(metadata, url);
-
- boolean isAlreadySentAndDiscovered;
- // need to synchronize: otherwise it might get added to the cache
- // without having been sent to ES
- waitAckLock.lock();
- try {
- // check that the same URL is not being sent to ES
- final var alreadySent = waitAck.getIfPresent(documentID);
- isAlreadySentAndDiscovered = status.equals(Status.DISCOVERED) && alreadySent != null;
- } finally {
- waitAckLock.unlock();
- }
-
- if (isAlreadySentAndDiscovered) {
- // if this object is discovered - adding another version of it
- // won't make any difference
- LOG.debug(
- "Already being sent to ES {} with status {} and ID {}",
- url,
- status,
- documentID);
- // ack straight away!
- eventCounter.scope("skipped").incrBy(1);
- super.ack(tuple, url);
- return;
- }
-
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
- builder.field("url", url);
- builder.field("status", status);
-
- builder.startObject("metadata");
- for (String mdKey : metadata.keySet()) {
- String[] values = metadata.getValues(mdKey);
- // periods are not allowed in ES2 - replace with %2E
- mdKey = mdKey.replaceAll("\\.", "%2E");
- builder.array(mdKey, values);
- }
-
- String partitionKey = partitioner.getPartition(url, metadata);
- if (partitionKey == null) {
- partitionKey = "_DEFAULT_";
- }
-
- // store routing key in metadata?
- if (StringUtils.isNotBlank(fieldNameForRoutingKey) && routingFieldNameInMetadata) {
- builder.field(fieldNameForRoutingKey, partitionKey);
- }
-
- builder.endObject();
-
- // store routing key outside metadata?
- if (StringUtils.isNotBlank(fieldNameForRoutingKey) && !routingFieldNameInMetadata) {
- builder.field(fieldNameForRoutingKey, partitionKey);
- }
-
- if (nextFetch.isPresent()) {
- builder.timeField("nextFetchDate", nextFetch.get());
- }
-
- builder.endObject();
-
- IndexRequest request = new IndexRequest(getIndexName(metadata));
-
- // check that we don't overwrite an existing entry
- // When create is used, the index operation will fail if a document
- // by that id already exists in the index.
- final boolean create = status.equals(Status.DISCOVERED);
- request.source(builder).id(documentID).create(create);
-
- if (doRouting) {
- request.routing(partitionKey);
- }
-
- waitAckLock.lock();
- try {
- final List tt = waitAck.get(documentID, k -> new LinkedList<>());
- tt.add(tuple);
- LOG.debug("Added to waitAck {} with ID {} total {}", url, documentID, tt.size());
- } finally {
- waitAckLock.unlock();
- }
-
- LOG.debug("Sending to ES buffer {} with ID {}", url, documentID);
-
- connection.addToProcessor(request);
- }
-
- @Override
- public void onRemoval(
- @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) {
- if (!cause.wasEvicted()) return;
- LOG.error("Purged from waitAck {} with {} values", key, value.size());
- for (Tuple t : value) {
- eventCounter.scope("purged").incrBy(1);
- _collector.fail(t);
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- LOG.debug("afterBulk [{}] with {} responses", executionId, request.numberOfActions());
- eventCounter.scope("bulks_received").incrBy(1);
- eventCounter.scope("bulk_msec").incrBy(response.getTook().getMillis());
- eventCounter.scope("received").incrBy(request.numberOfActions());
- receivedPerSecMetrics.scope("received").update(request.numberOfActions());
-
- var idsToBulkItemsWithFailedFlag =
- Arrays.stream(response.getItems())
- .map(
- bir -> {
- String id = bir.getId();
- BulkItemResponse.Failure f = bir.getFailure();
- boolean failed = false;
- if (f != null) {
- // already discovered
- if (f.getStatus().equals(RestStatus.CONFLICT)) {
- eventCounter.scope("doc_conflicts").incrBy(1);
- LOG.debug("Doc conflict ID {}", id);
- } else {
- LOG.error("Update ID {}, failure: {}", id, f);
- failed = true;
- }
- }
- return new BulkItemResponseToFailedFlag(bir, failed);
- })
- .collect(
- // https://github.com/DigitalPebble/storm-crawler/issues/832
- Collectors.groupingBy(
- idWithFailedFlagTuple -> idWithFailedFlagTuple.id,
- Collectors.toUnmodifiableList()));
-
- Map> presentTuples;
- long estimatedSize;
- Set debugInfo = null;
- waitAckLock.lock();
- try {
- presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet());
- if (!presentTuples.isEmpty()) {
- waitAck.invalidateAll(presentTuples.keySet());
- }
- estimatedSize = waitAck.estimatedSize();
- // Only if we have to.
- if (LOG.isDebugEnabled() && estimatedSize > 0L) {
- debugInfo = new HashSet<>(waitAck.asMap().keySet());
- }
- } finally {
- waitAckLock.unlock();
- }
-
- int ackCount = 0;
- int failureCount = 0;
-
- for (var entry : presentTuples.entrySet()) {
- final var id = entry.getKey();
- final var associatedTuple = entry.getValue();
- final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id);
-
- BulkItemResponseToFailedFlag selected;
- if (bulkItemsWithFailedFlag.size() == 1) {
- selected = bulkItemsWithFailedFlag.get(0);
- } else {
- // Fallback if there are multiple responses for the same id
- BulkItemResponseToFailedFlag tmp = null;
- var ctFailed = 0;
- for (var buwff : bulkItemsWithFailedFlag) {
- if (tmp == null) {
- tmp = buwff;
- }
- if (buwff.failed) ctFailed++;
- else tmp = buwff;
- }
- if (ctFailed != bulkItemsWithFailedFlag.size()) {
- LOG.warn(
- "The id {} would result in an ack and a failure. Using only the ack for processing.",
- id);
- }
- selected = Objects.requireNonNull(tmp);
- }
-
- if (associatedTuple != null) {
- LOG.debug("Acked {} tuple(s) for ID {}", associatedTuple.size(), id);
- for (Tuple tuple : associatedTuple) {
- if (!selected.failed) {
- String url = tuple.getStringByField("url");
- ackCount++;
- // ack and put in cache
- LOG.debug("Acked {} with ID {}", url, id);
- eventCounter.scope("acked").incrBy(1);
- super.ack(tuple, url);
- } else {
- failureCount++;
- eventCounter.scope("failed").incrBy(1);
- _collector.fail(tuple);
- }
- }
- } else {
- LOG.warn("Could not find unacked tuple for {}", id);
- }
- }
-
- LOG.info(
- "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}",
- executionId,
- idsToBulkItemsWithFailedFlag.size(),
- estimatedSize,
- ackCount,
- failureCount);
- if (debugInfo != null) {
- for (String kinaw : debugInfo) {
- LOG.debug("Still in wait ack after bulk response [{}] => {}", executionId, kinaw);
- }
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable throwable) {
- eventCounter.scope("bulks_received").incrBy(1);
- eventCounter.scope("received").incrBy(request.numberOfActions());
- receivedPerSecMetrics.scope("received").update(request.numberOfActions());
-
- LOG.error("Exception with bulk {} - failing the whole lot ", executionId, throwable);
-
- final var failedIds =
- request.requests().stream()
- .map(DocWriteRequest::id)
- .collect(Collectors.toUnmodifiableSet());
- waitAckLock.lock();
- Map> failedTupleLists;
- try {
- failedTupleLists = waitAck.getAllPresent(failedIds);
- if (!failedTupleLists.isEmpty()) {
- waitAck.invalidateAll(failedTupleLists.keySet());
- }
- } finally {
- waitAckLock.unlock();
- }
-
- for (var id : failedIds) {
- var failedTuples = failedTupleLists.get(id);
- if (failedTuples != null) {
- LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id);
- for (Tuple x : failedTuples) {
- // fail it
- eventCounter.scope("failed").incrBy(1);
- _collector.fail(x);
- }
- } else {
- LOG.warn("Could not find unacked tuple for {}", id);
- }
- }
- }
-
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("beforeBulk {} with {} actions", executionId, request.numberOfActions());
- eventCounter.scope("bulks_sent").incrBy(1);
- }
-
- /**
- * Must be overridden for implementing custom index names based on some metadata information By
- * Default, indexName coming from config is used
- */
- protected String getIndexName(Metadata m) {
- return indexName;
- }
-}
diff --git a/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/IndexerBoltTest.java b/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/IndexerBoltTest.java
deleted file mode 100644
index fb6278541..000000000
--- a/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/IndexerBoltTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.bolt;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.*;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
-import org.apache.stormcrawler.Constants;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.TestOutputCollector;
-import org.apache.stormcrawler.TestUtil;
-import org.apache.stormcrawler.indexing.AbstractIndexerBolt;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class IndexerBoltTest {
-
- @Rule public Timeout globalTimeout = Timeout.seconds(120);
-
- private ElasticsearchContainer container;
- private IndexerBolt bolt;
- protected TestOutputCollector output;
-
- private static final Logger LOG = LoggerFactory.getLogger(IndexerBoltTest.class);
- private static ExecutorService executorService;
-
- @BeforeClass
- public static void beforeClass() {
- executorService = Executors.newFixedThreadPool(2);
- }
-
- @AfterClass
- public static void afterClass() {
- executorService.shutdown();
- executorService = null;
- }
-
- @Before
- public void setupIndexerBolt() {
-
- String version = System.getProperty("elasticsearch-version");
- if (version == null) version = "7.17.7";
- LOG.info("Starting docker instance of Elasticsearch {}...", version);
-
- container =
- new ElasticsearchContainer(
- "docker.elastic.co/elasticsearch/elasticsearch:" + version);
- container.withPassword("s3cret");
- container.start();
-
- bolt = new IndexerBolt("content");
-
- // give the indexer the port for connecting to ES
-
- Map conf = new HashMap<>();
- conf.put(AbstractIndexerBolt.urlFieldParamName, "url");
- conf.put(AbstractIndexerBolt.canonicalMetadataParamName, "canonical");
- conf.put("es.indexer.addresses", container.getHttpHostAddress());
- conf.put("es.indexer.compatibility.mode", false);
- conf.put("es.indexer.user", "elastic");
- conf.put("es.indexer.password", "s3cret");
-
- output = new TestOutputCollector();
-
- bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output));
- }
-
- @After
- public void close() {
- LOG.info("Closing indexer bolt and ES container");
- bolt.cleanup();
- container.close();
- output = null;
- }
-
- private void index(String url, String text, Metadata metadata) {
- Tuple tuple = mock(Tuple.class);
- when(tuple.getStringByField("text")).thenReturn(text);
- when(tuple.getStringByField("url")).thenReturn(url);
- when(tuple.getValueByField("metadata")).thenReturn(metadata);
- bolt.execute(tuple);
- }
-
- private int lastIndex(String url, String text, Metadata metadata, long timeoutInMs)
- throws ExecutionException, InterruptedException, TimeoutException {
- var oldSize = output.getEmitted(Constants.StatusStreamName).size();
- index(url, text, metadata);
- return executorService
- .submit(
- () -> {
- // check that something has been emitted out
- var outputSize = output.getEmitted(Constants.StatusStreamName).size();
- while (outputSize == oldSize) {
- Thread.sleep(100);
- outputSize = output.getEmitted(Constants.StatusStreamName).size();
- }
- return outputSize;
- })
- .get(timeoutInMs, TimeUnit.MILLISECONDS);
- }
-
- @Test
- // https://github.com/DigitalPebble/storm-crawler/issues/832
- public void simultaneousCanonicals()
- throws ExecutionException, InterruptedException, TimeoutException {
- Metadata m1 = new Metadata();
- String url =
- "https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya.htm";
- m1.addValue("canonical", url);
-
- Metadata m2 = new Metadata();
- String url2 =
- "https://www.obozrevatel.com/ukr/dnipro/city/u-dnipri-ta-oblasti-ogolosili-shtormove-poperedzhennya/amp.htm";
- m2.addValue("canonical", url);
-
- index(url, "", m1);
- lastIndex(url2, "", m2, 10_000);
-
- // should be two in status output
- assertEquals(2, output.getEmitted(Constants.StatusStreamName).size());
-
- // and 2 acked
- assertEquals(2, output.getAckedTuples().size());
-
- // TODO check output in ES?
-
- }
-}
diff --git a/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/StatusBoltTest.java b/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/StatusBoltTest.java
deleted file mode 100644
index a89e79710..000000000
--- a/external/elasticsearch/src/test/java/org/apache/stormcrawler/elasticsearch/bolt/StatusBoltTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.stormcrawler.elasticsearch.bolt;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.*;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.TestOutputCollector;
-import org.apache.stormcrawler.TestUtil;
-import org.apache.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt;
-import org.apache.stormcrawler.persistence.Status;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.xcontent.XContentType;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class StatusBoltTest {
-
- private ElasticsearchContainer container;
- private StatusUpdaterBolt bolt;
- protected TestOutputCollector output;
-
- protected RestHighLevelClient client;
-
- private static final Logger LOG = LoggerFactory.getLogger(StatusBoltTest.class);
- private static ExecutorService executorService;
-
- @Rule public Timeout globalTimeout = Timeout.seconds(120);
-
- @BeforeClass
- public static void beforeClass() {
- executorService = Executors.newFixedThreadPool(2);
- }
-
- @AfterClass
- public static void afterClass() {
- executorService.shutdown();
- executorService = null;
- }
-
- @Before
- public void setupStatusBolt() throws IOException {
-
- String version = System.getProperty("elasticsearch-version");
- if (version == null) version = "7.17.7";
- LOG.info("Starting docker instance of Elasticsearch {}...", version);
-
- container =
- new ElasticsearchContainer(
- "docker.elastic.co/elasticsearch/elasticsearch:" + version)
- .withPassword("s3cret");
-
- container.start();
-
- bolt = new StatusUpdaterBolt();
-
- // configure the status index
-
- RestClientBuilder builder =
- RestClient.builder(
- new HttpHost(container.getHost(), container.getMappedPort(9200)));
-
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY, new UsernamePasswordCredentials("elastic", "s3cret"));
-
- builder.setHttpClientConfigCallback(
- clientBuilder -> {
- clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- return clientBuilder;
- });
-
- client = new RestHighLevelClient(builder);
-
- // TODO
- // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.13/java-rest-high-put-mapping.html
-
- CreateIndexRequest request = new CreateIndexRequest("status");
-
- URI uriToFile;
- try {
- uriToFile =
- Objects.requireNonNull(
- getClass().getClassLoader().getResource("status.mapping"))
- .toURI();
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
-
- String mappingSource = Files.readString(Path.of(uriToFile), Charset.defaultCharset());
-
- request.source(mappingSource, XContentType.JSON);
-
- client.indices().create(request, RequestOptions.DEFAULT);
-
- // configure the status updater bolt
-
- Map conf = new HashMap<>();
- conf.put("es.status.routing.fieldname", "metadata.key");
-
- conf.put("es.status.addresses", container.getHttpHostAddress());
-
- conf.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler");
-
- conf.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h");
-
- conf.put("metadata.persist", "someKey");
-
- conf.put("es.status.compatibility.mode", false);
-
- conf.put("es.status.user", "elastic");
- conf.put("es.status.password", "s3cret");
-
- output = new TestOutputCollector();
-
- bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output));
- }
-
- @After
- public void close() {
- LOG.info("Closing updater bolt and ES container");
- bolt.cleanup();
- container.close();
- output = null;
- try {
- client.close();
- } catch (IOException e) {
- }
- }
-
- private Future store(String url, Status status, Metadata metadata) {
- Tuple tuple = mock(Tuple.class);
- when(tuple.getValueByField("status")).thenReturn(status);
- when(tuple.getStringByField("url")).thenReturn(url);
- when(tuple.getValueByField("metadata")).thenReturn(metadata);
- bolt.execute(tuple);
-
- return executorService.submit(
- () -> {
- var outputSize = output.getAckedTuples().size();
- while (outputSize == 0) {
- Thread.sleep(100);
- outputSize = output.getAckedTuples().size();
- }
- return outputSize;
- });
- }
-
- @Test
- // see https://github.com/DigitalPebble/storm-crawler/issues/885
- public void checkListKeyFromES()
- throws IOException, ExecutionException, InterruptedException, TimeoutException {
-
- String url = "https://www.url.net/something";
-
- Metadata md = new Metadata();
-
- md.addValue("someKey", "someValue");
-
- store(url, Status.DISCOVERED, md).get(10, TimeUnit.SECONDS);
-
- assertEquals(1, output.getAckedTuples().size());
-
- // check output in ES?
-
- String id = org.apache.commons.codec.digest.DigestUtils.sha256Hex(url);
-
- GetResponse result = client.get(new GetRequest("status", id), RequestOptions.DEFAULT);
-
- Map sourceAsMap = result.getSourceAsMap();
-
- final String pfield = "metadata.someKey";
- sourceAsMap = (Map) sourceAsMap.get("metadata");
-
- final var pfieldNew = pfield.substring(9);
- Object key = sourceAsMap.get(pfieldNew);
-
- assertTrue(key instanceof java.util.ArrayList);
- }
-}
diff --git a/external/elasticsearch/src/test/resources/status.mapping b/external/elasticsearch/src/test/resources/status.mapping
deleted file mode 100644
index e5b14fe97..000000000
--- a/external/elasticsearch/src/test/resources/status.mapping
+++ /dev/null
@@ -1,39 +0,0 @@
-{
- "settings": {
- "index": {
- "number_of_shards": 10,
- "number_of_replicas": 1,
- "refresh_interval": "5s"
- }
- },
- "mappings": {
- "dynamic_templates": [{
- "metadata": {
- "path_match": "metadata.*",
- "match_mapping_type": "string",
- "mapping": {
- "type": "keyword"
- }
- }
- }],
- "_source": {
- "enabled": true
- },
- "properties": {
- "key": {
- "type": "keyword",
- "index": true
- },
- "nextFetchDate": {
- "type": "date",
- "format": "date_optional_time"
- },
- "status": {
- "type": "keyword"
- },
- "url": {
- "type": "keyword"
- }
- }
- }
-}
diff --git a/pom.xml b/pom.xml
index 28d93538e..7760a709c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -479,7 +479,6 @@ under the License.
core
external
external/aws
- external/elasticsearch
external/langid
external/opensearch
external/solr
@@ -488,7 +487,6 @@ under the License.
external/urlfrontier
external/warc
archetype
- external/elasticsearch/archetype
external/opensearch/archetype