Skip to content

Commit

Permalink
fix: serialize array output and logs (langflow-ai#3040)
Browse files Browse the repository at this point in the history
* refactor: improve recursive serialization function

Refactor the `recursive_serialize_or_str` function in the `schema.py` file to improve its readability and maintainability. The function now uses a try-except block to handle exceptions and returns a string representation of the object if an exception occurs. This ensures that the function always returns a string, preventing any unexpected errors. Additionally, the function now includes additional checks for different object types, such as dictionaries, lists, and instances of `BaseModel`. These checks ensure that the function correctly serializes complex objects and avoids any potential issues. Overall, this refactoring improves the code quality and reliability of the `recursive_serialize_or_str` function.

* feat(artifact.py): add support for recursive serialization of items in ARRAY artifact type to ensure consistent data handling

* feat(schema.py): add support for serializing arrays in build_output_logs function to handle LogType.ARRAY case

(cherry picked from commit 4e9367b)
  • Loading branch information
ogabrielluiz authored and nicoloboschi committed Jul 30, 2024
1 parent 42db1d2 commit 7212a06
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 22 deletions.
22 changes: 1 addition & 21 deletions src/backend/base/langflow/custom/custom_component/component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import Any, AsyncIterator, Callable, ClassVar, Generator, Iterator, List, Optional, Union
from typing import Any, Callable, ClassVar, List, Optional, Union
from uuid import UUID

import yaml
Expand All @@ -15,26 +15,6 @@
from .custom_component import CustomComponent


def recursive_serialize_or_str(obj):
try:
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
elif isinstance(obj, BaseModel):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
elif isinstance(obj, (AsyncIterator, Generator, Iterator)):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
return str(obj)
except Exception:
return str(obj)


class Component(CustomComponent):
inputs: List[InputTypes] = []
outputs: List[Output] = []
Expand Down
11 changes: 11 additions & 0 deletions src/backend/base/langflow/schema/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from langflow.schema import Data
from langflow.schema.message import Message
from langflow.schema.schema import recursive_serialize_or_str


class ArtifactType(str, Enum):
Expand Down Expand Up @@ -52,6 +53,16 @@ def get_artifact_type(value, build_result=None) -> str:
def post_process_raw(raw, artifact_type: str):
if artifact_type == ArtifactType.STREAM.value:
raw = ""
elif artifact_type == ArtifactType.ARRAY.value:
_raw = []
for item in raw:
if hasattr(item, "dict"):
_raw.append(recursive_serialize_or_str(item))
elif hasattr(item, "model_dump"):
_raw.append(recursive_serialize_or_str(item))
else:
_raw.append(str(item))
raw = _raw
elif artifact_type == ArtifactType.UNKNOWN.value and raw is not None:
if isinstance(raw, (BaseModel, dict)):
try:
Expand Down
37 changes: 36 additions & 1 deletion src/backend/base/langflow/schema/schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import Generator, Literal, Union
from typing import AsyncIterator, Generator, Iterator, Literal, Union

from pydantic import BaseModel
from typing_extensions import TypedDict
Expand Down Expand Up @@ -104,7 +104,42 @@ def build_output_logs(vertex, result) -> dict:

case LogType.UNKNOWN:
message = ""

case LogType.ARRAY:
message = [recursive_serialize_or_str(item) for item in message]
name = output.get("name", f"output_{index}")
outputs |= {name: OutputValue(message=message, type=_type).model_dump()}

return outputs


def recursive_serialize_or_str(obj):
try:
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
elif isinstance(obj, BaseModel):
if hasattr(obj, "model_dump"):
obj_dict = obj.model_dump()
elif hasattr(obj, "dict"):
obj_dict = obj.dict() # type: ignore
return {k: recursive_serialize_or_str(v) for k, v in obj_dict.items()}

elif isinstance(obj, (AsyncIterator, Generator, Iterator)):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
elif hasattr(obj, "dict"):
return {k: recursive_serialize_or_str(v) for k, v in obj.dict().items()}
elif hasattr(obj, "model_dump"):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
elif issubclass(obj, BaseModel):
# This a type BaseModel and not an instance of it
return repr(obj)
return str(obj)
except Exception:
return str(obj)

0 comments on commit 7212a06

Please sign in to comment.