Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added replace mode #15

Merged
merged 1 commit into from
Jan 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

## Configuration

- **mode**: "insert" or "replace". See below(string, optional, default is insert)
- **nodes**: list of nodes. nodes are pairs of host and port (list, required)
- **cluster_name**: name of the cluster (string, default is "elasticsearch")
- **index**: index name (string, required)
Expand All @@ -18,11 +19,38 @@
- **bulk_size**: Sets when to flush a new bulk request based on the size of actions currently added. (long, default is 5242880)
- **concurrent_requests**: concurrent_requests (int, default is 5)

### Modes

#### insert:

default.
This mode writes data to existing index.

#### replace:

1. Create new temporary index
2. Insert data into the new index
3. replace the alias with the new index. If alias doesn't exists, plugin will create new alias.
4. Delete existing (old) index if exists

Index should not exists with the same name as the alias

```yaml
out:
type: elasticsearch
mode: replace
nodes:
- {host: localhost, port: 9300}
index: <alias name> # plugin generates index name like <index>_%Y%m%d-%H%M%S
index_type: <index type>
```
## Example
```yaml
out:
type: elasticsearch
mode: insert
nodes:
- {host: localhost, port: 9300}
index: <index name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package org.embulk.output.elasticsearch;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.net.InetAddress;
import java.net.UnknownHostException;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -13,17 +21,24 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.InvalidAliasNameException;

import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
Expand All @@ -36,12 +51,14 @@
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

public class ElasticsearchOutputPlugin
implements OutputPlugin
Expand All @@ -60,6 +77,10 @@ public interface NodeAddressTask
public interface PluginTask
extends Task
{
@Config("mode")
@ConfigDefault("\"insert\"")
public Mode getMode();

@Config("nodes")
public List<NodeAddressTask> getNodes();

Expand All @@ -69,6 +90,12 @@ public interface PluginTask

@Config("index")
public String getIndex();
public void setIndex(String indexName);

@Config("alias")
@ConfigDefault("null")
public Optional<String> getAlias();
public void setAlias(Optional<String> aliasName);

@Config("index_type")
public String getType();
Expand Down Expand Up @@ -106,9 +133,15 @@ public ConfigDiff transaction(ConfigSource config, Schema schema,

// confirm that a client can be initialized
try (Client client = createClient(task)) {
}

try {
log.info(String.format("Executing plugin with '%s' mode.", task.getMode()));
if (task.getMode().equals(Mode.REPLACE)) {
task.setAlias(Optional.of(task.getIndex()));
task.setIndex(generateNewIndexName(task.getIndex()));
if (isExistsIndex(task.getAlias().orNull(), client) && !isAlias(task.getAlias().orNull(), client)) {
throw new ConfigException(String.format("Invalid alias name [%s], an index exists with the same name as the alias", task.getAlias().orNull()));
}
}
log.info(String.format("Inserting data into index[%s]", task.getIndex()));
control.run(task.dump());
} catch (Exception e) {
throw Throwables.propagate(e);
Expand All @@ -131,7 +164,16 @@ public ConfigDiff resume(TaskSource taskSource,
public void cleanup(TaskSource taskSource,
Schema schema, int processorCount,
List<TaskReport> successTaskReports)
{ }
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
try (Client client = createClient(task)) {
reAssignAlias(task.getAlias().orNull(), task.getIndex(), client);
} catch (IndexNotFoundException | InvalidAliasNameException e) {
throw new ConfigException(e);
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

private Client createClient(final PluginTask task)
{
Expand Down Expand Up @@ -414,4 +456,91 @@ public TaskReport commit()
}

}

public enum Mode
{
INSERT,
REPLACE;

@JsonValue
@Override
public String toString()
{
return name().toLowerCase(Locale.ENGLISH);
}

@JsonCreator
public static Mode fromString(String value)
{
switch (value) {
case "insert":
return INSERT;
case "replace":
return REPLACE;
default:
throw new ConfigException(String.format("Unknown mode '%s'. Supported modes are insert, truncate_insert, replace", value));
}
}
}

private void reAssignAlias(String aliasName, String newIndexName, Client client)
throws IndexNotFoundException, InvalidAliasNameException
{
if (!isExistsAlias(aliasName, client)) {
client.admin().indices().prepareAliases()
.addAlias(newIndexName, aliasName)
.execute().actionGet();
log.info(String.format("Assigned alias[%s] to index[%s]", aliasName, newIndexName));
} else {
List<String> oldIndices = getIndexByAlias(aliasName, client);
client.admin().indices().prepareAliases()
.removeAlias(oldIndices.toArray(new String[oldIndices.size()]), aliasName)
.addAlias(newIndexName, aliasName)
.execute().actionGet();
log.info(String.format("Reassigned alias[%s] from index%s to index[%s]", aliasName, oldIndices, newIndexName));
for (String index : oldIndices) {
deleteIndex(index, client);
}
}
}

private void deleteIndex(String indexName, Client client)
{
client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet();
log.info(String.format("Deleted Index [%s]", indexName));
}

private List<String> getIndexByAlias(String aliasName, Client client)
{
ImmutableOpenMap<String, List<AliasMetaData>> map = client.admin().indices().getAliases(new GetAliasesRequest(aliasName))
.actionGet().getAliases();
List<String> indices = new ArrayList<>();
for (ObjectObjectCursor<String, List<AliasMetaData>> c : map) {
indices.add(c.key);
}

return indices;
}

private boolean isExistsAlias(String aliasName, Client client)
{
return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasAlias(aliasName);
}

private boolean isExistsIndex(String indexName, Client client)
{
return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(indexName);
}

private boolean isAlias(String aliasName, Client client)
{
AliasOrIndex aliasOrIndex = client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().getAliasAndIndexLookup().get(aliasName);
return aliasOrIndex != null && aliasOrIndex.isAlias();
}

public String generateNewIndexName(String indexName)
{
Timestamp time = Exec.getTransactionTime();
return indexName + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli());
}
}