Skip to content

Commit

Permalink
Merge pull request #15 from muga/add-replace-mode
Browse files Browse the repository at this point in the history
Added replace mode
  • Loading branch information
sakama committed Jan 22, 2016
2 parents abc269a + fdea5c4 commit bfcd9a6
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 6 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

## Configuration

- **mode**: "insert" or "replace". See below(string, optional, default is insert)
- **nodes**: list of nodes. nodes are pairs of host and port (list, required)
- **cluster_name**: name of the cluster (string, default is "elasticsearch")
- **index**: index name (string, required)
Expand All @@ -18,11 +19,38 @@
- **bulk_size**: Sets when to flush a new bulk request based on the size of actions currently added. (long, default is 5242880)
- **concurrent_requests**: concurrent_requests (int, default is 5)

### Modes

#### insert:

default.
This mode writes data to existing index.

#### replace:

1. Create new temporary index
2. Insert data into the new index
3. replace the alias with the new index. If alias doesn't exists, plugin will create new alias.
4. Delete existing (old) index if exists

Index should not exists with the same name as the alias

```yaml
out:
type: elasticsearch
mode: replace
nodes:
- {host: localhost, port: 9300}
index: <alias name> # plugin generates index name like <index>_%Y%m%d-%H%M%S
index_type: <index type>
```
## Example
```yaml
out:
type: elasticsearch
mode: insert
nodes:
- {host: localhost, port: 9300}
index: <index name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package org.embulk.output.elasticsearch;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.net.InetAddress;
import java.net.UnknownHostException;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -13,17 +21,24 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.InvalidAliasNameException;

import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
Expand All @@ -36,12 +51,14 @@
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

public class ElasticsearchOutputPlugin
implements OutputPlugin
Expand All @@ -60,6 +77,10 @@ public interface NodeAddressTask
public interface PluginTask
extends Task
{
@Config("mode")
@ConfigDefault("\"insert\"")
public Mode getMode();

@Config("nodes")
public List<NodeAddressTask> getNodes();

Expand All @@ -69,6 +90,12 @@ public interface PluginTask

@Config("index")
public String getIndex();
public void setIndex(String indexName);

@Config("alias")
@ConfigDefault("null")
public Optional<String> getAlias();
public void setAlias(Optional<String> aliasName);

@Config("index_type")
public String getType();
Expand Down Expand Up @@ -106,9 +133,15 @@ public ConfigDiff transaction(ConfigSource config, Schema schema,

// confirm that a client can be initialized
try (Client client = createClient(task)) {
}

try {
log.info(String.format("Executing plugin with '%s' mode.", task.getMode()));
if (task.getMode().equals(Mode.REPLACE)) {
task.setAlias(Optional.of(task.getIndex()));
task.setIndex(generateNewIndexName(task.getIndex()));
if (isExistsIndex(task.getAlias().orNull(), client) && !isAlias(task.getAlias().orNull(), client)) {
throw new ConfigException(String.format("Invalid alias name [%s], an index exists with the same name as the alias", task.getAlias().orNull()));
}
}
log.info(String.format("Inserting data into index[%s]", task.getIndex()));
control.run(task.dump());
} catch (Exception e) {
throw Throwables.propagate(e);
Expand All @@ -131,7 +164,16 @@ public ConfigDiff resume(TaskSource taskSource,
public void cleanup(TaskSource taskSource,
Schema schema, int processorCount,
List<TaskReport> successTaskReports)
{ }
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
try (Client client = createClient(task)) {
reAssignAlias(task.getAlias().orNull(), task.getIndex(), client);
} catch (IndexNotFoundException | InvalidAliasNameException e) {
throw new ConfigException(e);
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

private Client createClient(final PluginTask task)
{
Expand Down Expand Up @@ -414,4 +456,91 @@ public TaskReport commit()
}

}

public enum Mode
{
INSERT,
REPLACE;

@JsonValue
@Override
public String toString()
{
return name().toLowerCase(Locale.ENGLISH);
}

@JsonCreator
public static Mode fromString(String value)
{
switch (value) {
case "insert":
return INSERT;
case "replace":
return REPLACE;
default:
throw new ConfigException(String.format("Unknown mode '%s'. Supported modes are insert, truncate_insert, replace", value));
}
}
}

private void reAssignAlias(String aliasName, String newIndexName, Client client)
throws IndexNotFoundException, InvalidAliasNameException
{
if (!isExistsAlias(aliasName, client)) {
client.admin().indices().prepareAliases()
.addAlias(newIndexName, aliasName)
.execute().actionGet();
log.info(String.format("Assigned alias[%s] to index[%s]", aliasName, newIndexName));
} else {
List<String> oldIndices = getIndexByAlias(aliasName, client);
client.admin().indices().prepareAliases()
.removeAlias(oldIndices.toArray(new String[oldIndices.size()]), aliasName)
.addAlias(newIndexName, aliasName)
.execute().actionGet();
log.info(String.format("Reassigned alias[%s] from index%s to index[%s]", aliasName, oldIndices, newIndexName));
for (String index : oldIndices) {
deleteIndex(index, client);
}
}
}

private void deleteIndex(String indexName, Client client)
{
client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet();
log.info(String.format("Deleted Index [%s]", indexName));
}

private List<String> getIndexByAlias(String aliasName, Client client)
{
ImmutableOpenMap<String, List<AliasMetaData>> map = client.admin().indices().getAliases(new GetAliasesRequest(aliasName))
.actionGet().getAliases();
List<String> indices = new ArrayList<>();
for (ObjectObjectCursor<String, List<AliasMetaData>> c : map) {
indices.add(c.key);
}

return indices;
}

private boolean isExistsAlias(String aliasName, Client client)
{
return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasAlias(aliasName);
}

private boolean isExistsIndex(String indexName, Client client)
{
return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(indexName);
}

private boolean isAlias(String aliasName, Client client)
{
AliasOrIndex aliasOrIndex = client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().getAliasAndIndexLookup().get(aliasName);
return aliasOrIndex != null && aliasOrIndex.isAlias();
}

public String generateNewIndexName(String indexName)
{
Timestamp time = Exec.getTransactionTime();
return indexName + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli());
}
}

0 comments on commit bfcd9a6

Please sign in to comment.