Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

add support for opensearch #238

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
152 changes: 152 additions & 0 deletions odd_collector/adapters/opensearch/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Optional
from urllib.parse import urlparse

from funcy import get_in, get_lax
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import ElasticSearchGenerator, Generator

from odd_collector.domain.plugin import ElasticsearchPlugin

from .client import Client
from .logger import logger
from .mappers.indices import map_index
from .mappers.stream import map_data_stream
from .mappers.template import TemplateEntity, map_template


class Adapter(BaseAdapter):
config: ElasticsearchPlugin
generator: ElasticSearchGenerator

def __init__(self, config: ElasticsearchPlugin) -> None:
super().__init__(config)
self.client = Client(config)

def create_generator(self) -> Generator:
return ElasticSearchGenerator(host_settings=urlparse(self.config.host).netloc)

def get_data_entity_list(self) -> DataEntityList:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=list(self.get_datasets()),
)

def get_datasets(self) -> list[DataEntity]:
logger.debug(
f"Start collecting datasets from Elasticsearch at {self.config.host} with port {self.config.port}"
)

indices = self.client.get_indices("*")
templates = self.client.get_index_template("*")

mappings = self.client.get_mapping()
data_streams = self.client.get_data_streams()

indices = [
index for index in indices if not index["index"].startswith(".internal")
]
logger.success(f"Got {len(indices)} indices")

index_by_names = {index["index"]: index for index in indices}
templates_by_names = {
tmpl["name"]: tmpl for tmpl in templates if not tmpl["name"].startswith(".")
}
streams_by_names = {stream["name"]: stream for stream in data_streams}
mappings_by_names = dict(mappings.items())

indices_entities: dict[str, DataEntity] = {}
for index_name, index in index_by_names.items():
indices_entities[index_name] = map_index(
index=index,
generator=self.generator,
properties=get_in(
mappings_by_names,
[index_name, "mappings", "properties"],
default={},
),
)

# map templates
template_entities: dict[str, TemplateEntity] = {}
for tmpl_name, tmpl in templates_by_names.items():
data_entity = map_template(tmpl, self.generator)
pattern = tmpl["index_template"]["index_patterns"]

# Here we are trying to get all indices that match the pattern
# to show that current template works with index
# But if we can't get them, we just skip
try:
for index_name in self.client.get_indices(index=pattern, h="index"):
if index_entity := indices_entities.get(index_name["index"]):
data_entity.add_output(index_entity)
except Exception as e:
logger.warning(e)
continue

template_entities[tmpl_name] = data_entity

# map data streams
stream_entities = {}
for stream_name, stream in streams_by_names.items():
stream_data_entity = map_data_stream(stream, self.generator)
stream_entities[stream_name] = stream_data_entity

if template_entity := template_entities.get(stream["template"]):
template_entity.add_input(stream_data_entity)

return [
*indices_entities.values(),
*stream_entities.values(),
*template_entities.values(),
]

# TODO: implement mapping rollover policies
def _get_rollover_policy(self, stream_data: dict) -> Optional[dict]:
try:
backing_indices = [
index_info["index_name"] for index_info in stream_data["indices"]
]
for index in backing_indices:
index_settings = self.client.get_indices(index)
lifecycle_policy = get_lax(
index_settings, [index, "settings", "index", "lifecycle"]
)

if lifecycle_policy:
logger.debug(
f"Index {index} has Lifecycle Policy {lifecycle_policy['name']}"
)
lifecycle_policy_data = self.client.ilm.get_lifecycle(
name=lifecycle_policy["name"]
)

logger.debug(f"Lifecycle policy metadata {lifecycle_policy_data}")

rollover = get_lax(
lifecycle_policy_data,
[
lifecycle_policy["name"],
"policy",
"phases",
"hot",
"actions",
"rollover",
],
)

if rollover is not None:
max_size = rollover.get("max_size")
max_age = rollover.get("max_age")
else:
max_size = None
max_age = None

lifecycle_metadata = {"max_age": max_age, "max_size": max_size}
return lifecycle_metadata
else:
logger.debug(f"No lifecycle policy exists for this index {index}.")
return None
except KeyError:
logger.debug(f"Incorrect fields. Got fields: {stream_data}")
return None
45 changes: 45 additions & 0 deletions odd_collector/adapters/opensearch/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Optional

from opensearchpy import OpenSearch

from odd_collector.domain.plugin import OpensearchPlugin


class Client:
def __init__(self, config: OpensearchPlugin):
username, password = config.username, config.password.get_secret_value()
manugarri marked this conversation as resolved.
Show resolved Hide resolved
if username and password:
manugarri marked this conversation as resolved.
Show resolved Hide resolved
self._os = OpenSearch(
hosts=[f"{config.host}:{config.port}"],
basic_auth=(username, password),
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
use_ssl=config.use_ssl,
)
else:
self._os = OpenSearch(
hosts=[f"{config.host}:{config.port}"],
basic_auth=(username, password),
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
use_ssl=config.use_ssl,
)
assert self._os.ping()

def get_indices(self, index: Optional[str] = None, h=None) -> list:
return self._os.cat.indices(format="json", index=index, h=h)

def get_mapping(self, index_name: Optional[str] = None) -> dict:
return self._os.indices.get_mapping(index=index_name)

def get_index_settings(self, index_name: str) -> dict:
return self._os.indices.get_settings(index=index_name)

def get_data_streams(self, name: Optional[str] = None) -> dict:
response = self._os.indices.get_data_stream(name=name)
return response["data_streams"]

def get_index_template(self, template_name: str) -> list[dict]:
return self._os.indices.get_index_template(name=template_name).get(
"index_templates"
)
3 changes: 3 additions & 0 deletions odd_collector/adapters/opensearch/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from odd_collector_sdk.logger import logger

logger = logger
Empty file.
83 changes: 83 additions & 0 deletions odd_collector/adapters/opensearch/mappers/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Any, Dict

from odd_models.models import DataSetField, DataSetFieldType, Type
from oddrn_generator import ElasticSearchGenerator

# As of ElasticSearch 7.x supported fields are listed here
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html#
TYPES_ELASTIC_TO_ODD = {
"blob": Type.TYPE_STRING,
"boolean": Type.TYPE_BOOLEAN,
"constant_keyword": Type.TYPE_STRING,
"date": Type.TYPE_DATETIME,
"date_nanos": Type.TYPE_INTEGER,
"double": Type.TYPE_NUMBER,
"float": Type.TYPE_NUMBER,
"geo_point": Type.TYPE_MAP,
"flattened": Type.TYPE_MAP,
"half_float": Type.TYPE_NUMBER,
"integer": Type.TYPE_INTEGER,
"ip": Type.TYPE_STRING,
"keyword": Type.TYPE_STRING,
"long": Type.TYPE_INTEGER,
"nested": Type.TYPE_LIST,
"object": Type.TYPE_MAP,
"text": Type.TYPE_STRING,
"wildcard": Type.TYPE_STRING,
}


def is_logical(type_property: str) -> bool:
return type_property == "boolean"


def __get_field_type(props: Dict[str, Any]) -> str:
"""
Sample mapping for field types
{'@timestamp' : {'type' : "alias","path" : "timestamp"},
'timestamp" : {"type" : "date"},
'bool_var': {'type': 'boolean'},
'data_stream': {'properties': {'dataset': {'type': 'constant_keyword'},
'namespace': {'type': 'constant_keyword'},
'type': {'type': 'constant_keyword',
'value': 'logs'}}},
'event1': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'keyword'}}},
'event2': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'constant_keyword'}}},
'event3': {'properties': {'dataset': {'ignore_above': 1024, 'type': 'wildcard'}}},
'host': {'type': 'object'},
'int_field': {'type': 'long'},
'float_field': {'type': 'float'},
"""
if "type" in props:
return props["type"]
elif "properties" in props:
return "object"
else:
return "unknown"


def map_field(
field_name: str,
field_metadata: dict,
oddrn_generator: ElasticSearchGenerator,
path: str,
) -> DataSetField:
data_type: str = __get_field_type(field_metadata)
oddrn_path: str = oddrn_generator.get_oddrn_by_path(path, field_name)
field_type = TYPES_ELASTIC_TO_ODD.get(data_type, Type.TYPE_UNKNOWN)

dsf: DataSetField = DataSetField(
oddrn=oddrn_path,
name=field_name,
metadata=[],
type=DataSetFieldType(
type=field_type,
logical_type=data_type,
is_nullable=True,
),
default_value=None,
description=None,
owner=None,
)

return dsf
30 changes: 30 additions & 0 deletions odd_collector/adapters/opensearch/mappers/indices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import ElasticSearchGenerator

from odd_collector.adapters.elasticsearch.mappers.fields import map_field
from odd_collector.adapters.elasticsearch.mappers.metadata import extract_index_metadata


def map_index(
index: dict,
properties: dict,
generator: ElasticSearchGenerator,
) -> DataEntity:
generator.set_oddrn_paths(indices=index["index"])
index_oddrn = generator.get_oddrn_by_path("indices")

# field type with `@` prefix defines alias for another field in same index
field_list = [
map_field(name, value, generator, "indices_fields")
for name, value in properties.items()
if not name.startswith("@")
]

return DataEntity(
oddrn=index_oddrn,
name=index["index"],
owner=None,
type=DataEntityType.TABLE,
metadata=[extract_index_metadata(index)],
dataset=DataSet(parent_oddrn=None, rows_number=0, field_list=field_list),
)
44 changes: 44 additions & 0 deletions odd_collector/adapters/opensearch/mappers/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from dataclasses import dataclass
from typing import Any

from funcy import walk_values
from odd_collector_sdk.utils.metadata import (
DefinitionType,
HasMetadata,
MetadataExtension,
extract_metadata,
)

from ..logger import logger


@dataclass
class MetadataWrapper(HasMetadata):
odd_metadata: dict[str, Any]


def extract_index_metadata(data: dict[str, Any]) -> MetadataExtension:
meta_wrapper = MetadataWrapper(odd_metadata=data)
return extract_metadata("elasticsearch", meta_wrapper, DefinitionType.DATASET)


def extract_template_metadata(data: dict[str, Any]) -> MetadataExtension:
metadata = data

try:
metadata = walk_values(json.dumps, metadata)
except Exception as e:
logger.warning(f"Can't convert template metadata to json. {str(e)}")
logger.debug(f"Template metadata: {data!r}")

meta_wrapper = MetadataWrapper(odd_metadata=metadata)

return extract_metadata("elasticsearch", meta_wrapper, DefinitionType.DATASET)


def extract_data_stream_metadata(data: dict[str, Any]) -> MetadataExtension:
meta_wrapper = MetadataWrapper(odd_metadata=data)
return extract_metadata(
"elasticsearch", meta_wrapper, DefinitionType.DATASET, flatten=True
)
22 changes: 22 additions & 0 deletions odd_collector/adapters/opensearch/mappers/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import ElasticSearchGenerator

from .metadata import extract_data_stream_metadata


def map_data_stream(
stream_data: dict,
generator: ElasticSearchGenerator,
) -> DataEntity:
generator.set_oddrn_paths(streams=stream_data["name"])
stream_oddrn = generator.get_oddrn_by_path("streams")

return DataEntity(
oddrn=stream_oddrn,
name=stream_data["name"],
owner=None,
# TODO: Change to stream type
type=DataEntityType.FILE,
metadata=[extract_data_stream_metadata(stream_data)],
dataset=DataSet(parent_oddrn=None, rows_number=0, field_list=[]),
)
Loading