Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable Support for Querying UDF through Logical Type #441

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4b00f5a
Initial Commit: add support for resolving UDF using type
adarsh2397 Oct 20, 2022
f529015
fix: a typo in operators.py
adarsh2397 Oct 21, 2022
be3ed5e
changes: remove udf_types enum and use existing type from catalog
adarsh2397 Oct 21, 2022
90a486f
fix: formatting of python files
adarsh2397 Oct 21, 2022
fabf059
checkpoint commit: modified binder and catalog manager
adarsh2397 Nov 11, 2022
e75c8c5
(add): shift resolving UDF by logcal type to planner
adarsh2397 Nov 11, 2022
4c09345
add: Convert Logical to Physical UDF in LogicalGetToSeqScan
adarsh2397 Nov 14, 2022
a2a4b27
fix: added condition to check if _function is None before fetching udf
adarsh2397 Nov 15, 2022
0802dbd
change: util to check if FunctionExpression in expression_utils
adarsh2397 Nov 17, 2022
73adfc2
change: using expressionUtils to check and cast to AbstractExpression
adarsh2397 Nov 17, 2022
1d8784d
change: add `function_type` parameter to FunctionExpression
adarsh2397 Nov 17, 2022
f561b2b
change: check if `function_type` is set before fetching UDF by type
adarsh2397 Nov 17, 2022
cfda845
change: ran formatter.py
adarsh2397 Nov 17, 2022
5b69688
fix: formatting issues
adarsh2397 Nov 17, 2022
b536921
fix: small formatting errors
adarsh2397 Nov 17, 2022
73b6a7a
fix: formatting errors in rules.py
adarsh2397 Nov 17, 2022
92afa85
fix: add check if target_list is None
adarsh2397 Nov 17, 2022
aee96dc
add: utility function to resolve alias in expression_utils.py
adarsh2397 Dec 8, 2022
d5ca9df
add: resolve alias only if UDF has been resolved in statement binder
adarsh2397 Dec 8, 2022
1b2380c
fix: formatting
adarsh2397 Dec 8, 2022
c502cd8
add: simple test case for logical udf select query
adarsh2397 Dec 8, 2022
ae4b05b
add: testcase for get_udf_by_type
adarsh2397 Dec 8, 2022
6184b02
remove: unused functions
adarsh2397 Dec 8, 2022
6e4f115
add: func_expr setter in LogicalFunctionScan
adarsh2397 Dec 8, 2022
87948ff
add: testcase for lateral join with logical udf
adarsh2397 Dec 8, 2022
b1112bd
remove: unnecessary if condition
adarsh2397 Dec 8, 2022
671a3eb
remove: unnecessary method check_udf_type_exists
adarsh2397 Dec 9, 2022
4b3e7db
minor bugfix
adarsh2397 Dec 9, 2022
5a9c7b5
remove: unnecessary method
adarsh2397 Dec 9, 2022
aaa91ad
add: TODO in create_udf_exector.py
adarsh2397 Dec 9, 2022
214510c
fix: formatting changes
adarsh2397 Dec 9, 2022
7a6997b
bugfix: replace .one() with .first() for get_udf_type
adarsh2397 Dec 9, 2022
95348bb
remove: unnecessary try except
adarsh2397 Dec 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from eva.catalog.catalog_manager import CatalogManager
from eva.configuration.configuration_manager import ConfigurationManager
from eva.expression.abstract_expression import AbstractExpression
from eva.expression.expression_utils import extract_alias_from_function_expression
from eva.expression.function_expression import FunctionExpression
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.parser.alias import Alias
from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement
from eva.parser.drop_statement import DropTableStatement
from eva.parser.load_statement import LoadDataStatement
Expand Down Expand Up @@ -227,26 +227,44 @@ def _bind_func_expr(self, node: FunctionExpression):
for child in node.children:
self.bind(child)

# First, check if it refers to any specific model in the catalog
udf_obj = self._catalog.get_udf_by_name(node.name)
if udf_obj is None:
err_msg = (
f"UDF with name {node.name} does not exist in the catalog. "
"Please create the UDF using CREATE UDF command."
)
logger.error(err_msg)
raise BinderError(err_msg)

try:
node.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)()
except Exception as e:
err_msg = (
f"{str(e)}. Please verify that the UDF class name in the"
"implementation file matches the UDF name."
)
logger.error(err_msg)
raise BinderError(err_msg)
# If not, we would want to check if it refers to a type of a model instead
if self._catalog.get_udf_by_type(node.name) is None:
err_msg = (
f"UDF with name {node.name} does not exist in the catalog. "
"Please create the UDF using CREATE UDF command."
)
logger.error(err_msg)
raise BinderError(err_msg)
else:
# nothing much to do here since it does not exist
node.function = None
node.function_type = node.name
else:
try:
node.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)()
except Exception as e:
err_msg = (
f"{str(e)}. Please verify that the UDF class name in the"
"implementation file matches the UDF name."
)
logger.error(err_msg)
raise BinderError(err_msg)

output_objs = self._catalog.get_udf_outputs(udf_obj)
# we need to populate output_objs even with logical UDF type
# we'll use the type name to get any udf_output that matches this type
output_objs = None
if udf_obj is None:
# TODO: Change this method later
# 1. Get any UDF ID that matches the logical type
# 2. Use this ID to get the output_objs
# (Assumption: All UDFs with the same logical type will have same outputs)
udf_temp_obj = self._catalog.get_udf_by_type(node.name)
output_objs = self._catalog.get_udf_outputs(udf_temp_obj)
else:
output_objs = self._catalog.get_udf_outputs(udf_obj)
if node.output:
for obj in output_objs:
if obj.name.lower() == node.output:
Expand All @@ -260,23 +278,14 @@ def _bind_func_expr(self, node: FunctionExpression):
node.output_objs = output_objs
node.projection_columns = [obj.name.lower() for obj in output_objs]

default_alias_name = node.name.lower()
default_output_col_aliases = [str(obj.name.lower()) for obj in node.output_objs]
if not node.alias:
node.alias = Alias(default_alias_name, default_output_col_aliases)
else:
if not len(node.alias.col_names):
node.alias = Alias(node.alias.alias_name, default_output_col_aliases)
else:
output_aliases = [
str(col_name.lower()) for col_name in node.alias.col_names
]
node.alias = Alias(node.alias.alias_name, output_aliases)
# resolve Alias only if the UDF has been resolved
if node.function is not None:
node.alias = extract_alias_from_function_expression(node)

if len(node.alias.col_names) != len(node.output_objs):
err_msg = (
f"Expected {len(node.output_objs)} output columns for "
f"{node.alias.alias_name}, got {len(node.alias.col_names)}."
)
logger.error(err_msg)
raise BinderError(err_msg)
if len(node.alias.col_names) != len(node.output_objs):
err_msg = (
f"Expected {len(node.output_objs)} output columns for "
f"{node.alias.alias_name}, got {len(node.alias.col_names)}."
)
logger.error(err_msg)
raise BinderError(err_msg)
12 changes: 12 additions & 0 deletions eva/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ def get_udf_by_name(self, name: str) -> UdfMetadata:
"""
return self._udf_service.udf_by_name(name)

def get_udf_by_type(self, type: str) -> UdfMetadata:
"""
Get the UDF information based on type.

Arguments:
type (str): type of the UDF

Returns:
UdfMetadata object
"""
return self._udf_service.udf_by_type(type)

def get_udf_inputs(self, udf_obj: UdfMetadata) -> List[UdfIO]:
if not isinstance(udf_obj, UdfMetadata):
raise ValueError(
Expand Down
10 changes: 10 additions & 0 deletions eva/catalog/services/udf_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def udf_by_name(self, name: str):
except NoResultFound:
return None

def udf_by_type(self, name: str):
"""return the first udf entry that matches the type provided.
None if no such entry found.

Arguments:
name (str): name to be searched
"""

return self.model.query.filter(self.model._type == name).first()

def udf_by_id(self, id: int):
"""return the udf entry that matches the id provided.
None if no such entry found.
Expand Down
7 changes: 7 additions & 0 deletions eva/executor/create_udf_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def exec(self):
msg = f"UDF {self.node.name} already exists."
logger.error(msg)
raise RuntimeError(msg)

# TODO: enable this once all existing UDFs have a common type, input and output standard
# check catalog if a UDF of the same type already exists
if catalog_manager.get_udf_by_type(self.node.udf_type):
# check if the inputs and outputs of this type match with existing UDF
pass

io_list = []
io_list.extend(self.node.inputs)
io_list.extend(self.node.outputs)
Expand Down
42 changes: 42 additions & 0 deletions eva/expression/expression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from eva.expression.abstract_expression import AbstractExpression, ExpressionType
from eva.expression.comparison_expression import ComparisonExpression
from eva.expression.constant_value_expression import ConstantValueExpression
from eva.expression.function_expression import FunctionExpression
from eva.expression.logical_expression import LogicalExpression
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.parser.alias import Alias


def expression_tree_to_conjunction_list(expression_tree):
Expand Down Expand Up @@ -275,3 +277,43 @@ def _has_simple_expressions(expr):
]

return _has_simple_expressions(predicate) and contains_single_column(predicate)


def extract_alias_from_function_expression(expr: FunctionExpression) -> Alias:
"""Returns the Alias property for the Function Expression based on its name and output.

Args:
expr (FunctionExpression): Function Expression to process

Returns:
Alias: the alias property to set to the Function Expression
"""
alias: Alias = None

default_alias_name = expr.name.lower()
default_output_col_aliases = [str(obj.name.lower()) for obj in expr.output_objs]
if not expr.alias:
alias = Alias(default_alias_name, default_output_col_aliases)
else:
if not len(expr.alias.col_names):
alias = Alias(expr.alias.alias_name, default_output_col_aliases)
else:
output_aliases = [
str(col_name.lower()) for col_name in expr.alias.col_names
]
alias = Alias(expr.alias.alias_name, output_aliases)

return alias


def is_function_expression(expr: AbstractExpression) -> bool:
"""Checks if the expr is of type ExpressionType.FUNCTION_EXPRESSION

Args:
expr (AbstractExpression): expression to check

Returns:
bool: True, if it is a FunctionExpression, else False
"""

return expr.etype == ExpressionType.FUNCTION_EXPRESSION
14 changes: 14 additions & 0 deletions eva/expression/function_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def __init__(
name: str,
output: str = None,
alias: Alias = None,
func_type: str = None,
**kwargs
):

super().__init__(ExpressionType.FUNCTION_EXPRESSION, **kwargs)
self._context = Context()
self._name = name
self._function_type = func_type
self._function = func
self._output = output
self.alias = alias
Expand All @@ -73,10 +75,22 @@ def output(self):
def function(self):
return self._function

@property
def function_type(self):
return self._function_type

@name.setter
def name(self, func_name: str):
self._name = func_name

@function.setter
def function(self, func: Callable):
self._function = func

@function_type.setter
def function_type(self, func_type: str):
self._function_type = func_type

def evaluate(self, batch: Batch, **kwargs) -> Batch:
new_batch = batch
child_batches = [child.evaluate(batch, **kwargs) for child in self.children]
Expand Down
6 changes: 5 additions & 1 deletion eva/optimizer/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ class LogicalCreateUDF(Operator):
This file should be placed in the UDF directory and
the path provided should be relative to the UDF dir.
udf_type: str
udf type. it ca be object detection, classification etc.
udf type. it can be object detection, classification etc.
"""

def __init__(
Expand Down Expand Up @@ -863,6 +863,10 @@ def func_expr(self):
def do_unnest(self):
return self._do_unnest

@func_expr.setter
def func_expr(self, expr):
self._func_expr = expr

def __eq__(self, other):
is_subtree_equal = super().__eq__(other)
if not isinstance(other, LogicalFunctionScan):
Expand Down
43 changes: 42 additions & 1 deletion eva/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from enum import Flag, IntEnum, auto
from typing import TYPE_CHECKING

from eva.expression.expression_utils import conjuction_list_to_expression_tree
from eva.catalog.catalog_manager import CatalogManager
from eva.expression.expression_utils import (
conjuction_list_to_expression_tree,
extract_alias_from_function_expression,
is_function_expression,
)
from eva.expression.function_expression import FunctionExpression
from eva.optimizer.optimizer_utils import (
extract_equi_join_keys,
extract_pushdown_predicate,
Expand All @@ -31,6 +37,7 @@
from eva.planner.predicate_plan import PredicatePlan
from eva.planner.project_plan import ProjectPlan
from eva.planner.show_info_plan import ShowInfoPlan
from eva.utils.generic_utils import path_to_class

if TYPE_CHECKING:
from eva.optimizer.optimizer_context import OptimizerContext
Expand Down Expand Up @@ -673,6 +680,7 @@ def apply(self, before: LogicalUpload, context: OptimizerContext):
class LogicalGetToSeqScan(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALGET)
self.catalog = CatalogManager()
super().__init__(RuleType.LOGICAL_GET_TO_SEQSCAN, pattern)

def promise(self):
Expand All @@ -692,6 +700,27 @@ def apply(self, before: LogicalGet, context: OptimizerContext):
)
if config_batch_mem_size:
batch_mem_size = config_batch_mem_size

if before.target_list is not None:
for idx, target in enumerate(before.target_list):
if is_function_expression(target):
func_expr: FunctionExpression = target
if (
func_expr.function is None
and func_expr.function_type is not None
):
# TODO: Replace 'get_udf_by_type' with a cost-based selection method
udf_obj = self.catalog.get_udf_by_type(func_expr.function_type)
func_expr.function = path_to_class(
udf_obj.impl_file_path, udf_obj.name
)()
func_expr.name = udf_obj.name
func_expr.alias = extract_alias_from_function_expression(
func_expr
) # resolve the alias

before.target_list[idx] = func_expr

after = SeqScanPlan(None, before.target_list, before.alias)
after.append_child(
StoragePlan(
Expand Down Expand Up @@ -803,6 +832,7 @@ def apply(self, before: LogicalLimit, context: OptimizerContext):
class LogicalFunctionScanToPhysical(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALFUNCTIONSCAN)
self.catalog = CatalogManager()
super().__init__(RuleType.LOGICAL_FUNCTION_SCAN_TO_PHYSICAL, pattern)

def promise(self):
Expand All @@ -812,6 +842,17 @@ def check(self, before: Operator, context: OptimizerContext):
return True

def apply(self, before: LogicalFunctionScan, context: OptimizerContext):
func_expr: FunctionExpression = before.func_expr
if func_expr.function is None and func_expr.function_type is not None:
# TODO: Replace 'get_udf_by_type' with a cost-based selection method
udf_obj = self.catalog.get_udf_by_type(func_expr.function_type)
func_expr.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)()
func_expr.name = udf_obj.name
func_expr.alias = extract_alias_from_function_expression(
func_expr
) # resolve the alias as well
before.func_expr = func_expr

after = FunctionScanPlan(before.func_expr, before.do_unnest)
return after

Expand Down
2 changes: 1 addition & 1 deletion eva/udfs/udf_bootstrap_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
INPUT (Frame_Array NDARRAY UINT8(3, ANYDIM, ANYDIM))
OUTPUT (labels NDARRAY STR(ANYDIM), bboxes NDARRAY FLOAT32(ANYDIM, 4),
scores NDARRAY FLOAT32(ANYDIM))
TYPE Classification
TYPE ObjectDetection
IMPL '{}/udfs/fastrcnn_object_detector.py';
""".format(
EVA_INSTALLATION_DIR
Expand Down
7 changes: 7 additions & 0 deletions test/catalog/test_catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ def test_get_udf_by_name(self, udf_mock):
udf_mock.return_value.udf_by_name.assert_called_with("name")
self.assertEqual(actual, udf_mock.return_value.udf_by_name.return_value)

@mock.patch("eva.catalog.catalog_manager.UdfService")
def test_get_udf_by_type(self, udf_mock):
catalog = CatalogManager()
actual = catalog.get_udf_by_type("type")
udf_mock.return_value.udf_by_type.assert_called_with("type")
self.assertEqual(actual, udf_mock.return_value.udf_by_type.return_value)

@mock.patch("eva.catalog.catalog_manager.UdfService")
def test_drop_udf(self, udf_mock):
CatalogManager().drop_udf("name")
Expand Down
8 changes: 8 additions & 0 deletions test/integration_tests/test_select_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ def test_lateral_join(self):
self.assertEqual(list(actual_batch.columns), ["myvideo.id", "T.a"])
self.assertEqual(len(actual_batch), 5)

@pytest.mark.torchtest
def test_lateral_join_with_logical_udf(self):
select_query = """SELECT id, a FROM MyVideo JOIN LATERAL
ObjectDetection(data) AS T(a,b,c) WHERE id < 5;"""
actual_batch = execute_query_fetch_all(select_query)
self.assertEqual(list(actual_batch.columns), ["myvideo.id", "T.a"])
self.assertEqual(len(actual_batch), 5)

@pytest.mark.torchtest
def test_lateral_join_with_multiple_projects(self):
select_query = """SELECT id, T.labels FROM MyVideo JOIN LATERAL
Expand Down
Loading