Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Remove serialized manifest from tracing requests for non-llm runs #26270

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
546 changes: 14 additions & 532 deletions libs/community/tests/unit_tests/load/__snapshots__/test_dump.ambr

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions libs/core/langchain_core/callbacks/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,15 +1354,15 @@ def on_chat_model_start(

def on_chain_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""Run when chain starts running.

Args:
serialized (Dict[str, Any]): The serialized chain.
serialized (Optional[Dict[str, Any]]): The serialized chain.
inputs (Union[Dict[str, Any], Any]): The inputs to the chain.
run_id (UUID, optional): The ID of the run. Defaults to None.
**kwargs (Any): Additional keyword arguments.
Expand Down Expand Up @@ -1398,7 +1398,7 @@ def on_chain_start(

def on_tool_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
input_str: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
Expand Down Expand Up @@ -1453,7 +1453,7 @@ def on_tool_start(

def on_retriever_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
Expand All @@ -1462,7 +1462,7 @@ def on_retriever_start(
"""Run when the retriever starts running.

Args:
serialized (Dict[str, Any]): The serialized retriever.
serialized (Optional[Dict[str, Any]]): The serialized retriever.
query (str): The query.
run_id (UUID, optional): The ID of the run. Defaults to None.
parent_run_id (UUID, optional): The ID of the parent run. Defaults to None.
Expand Down Expand Up @@ -1840,15 +1840,15 @@ async def on_chat_model_start(

async def on_chain_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForChainRun:
"""Async run when chain starts running.

Args:
serialized (Dict[str, Any]): The serialized chain.
serialized (Optional[Dict[str, Any]]): The serialized chain.
inputs (Union[Dict[str, Any], Any]): The inputs to the chain.
run_id (UUID, optional): The ID of the run. Defaults to None.
**kwargs (Any): Additional keyword arguments.
Expand Down Expand Up @@ -1886,7 +1886,7 @@ async def on_chain_start(

async def on_tool_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
input_str: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
Expand All @@ -1895,7 +1895,7 @@ async def on_tool_start(
"""Run when the tool starts running.

Args:
serialized (Dict[str, Any]): The serialized tool.
serialized (Optional[Dict[str, Any]]): The serialized tool.
input_str (str): The input to the tool.
run_id (UUID, optional): The ID of the run. Defaults to None.
parent_run_id (UUID, optional): The ID of the parent run.
Expand Down Expand Up @@ -1975,7 +1975,7 @@ async def on_custom_event(

async def on_retriever_start(
self,
serialized: Dict[str, Any],
serialized: Optional[Dict[str, Any]],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
Expand All @@ -1984,7 +1984,7 @@ async def on_retriever_start(
"""Run when the retriever starts running.

Args:
serialized (Dict[str, Any]): The serialized retriever.
serialized (Optional[Dict[str, Any]]): The serialized retriever.
query (str): The query.
run_id (UUID, optional): The ID of the run. Defaults to None.
parent_run_id (UUID, optional): The ID of the parent run. Defaults to None.
Expand Down
3 changes: 3 additions & 0 deletions libs/core/langchain_core/prompts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import yaml

from langchain_core.load import dumpd
from langchain_core.output_parsers.base import BaseOutputParser
from langchain_core.prompt_values import (
ChatPromptValueConcrete,
Expand Down Expand Up @@ -188,6 +189,7 @@ def invoke(
input,
config,
run_type="prompt",
serialized=dumpd(self),
)

async def ainvoke(
Expand All @@ -212,6 +214,7 @@ async def ainvoke(
input,
config,
run_type="prompt",
serialized=dumpd(self),
)

@abstractmethod
Expand Down
9 changes: 4 additions & 5 deletions libs/core/langchain_core/retrievers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.load.dump import dumpd
from langchain_core.runnables import (
Runnable,
RunnableConfig,
Expand Down Expand Up @@ -235,9 +234,9 @@ def invoke(
local_metadata=self.metadata,
)
run_manager = callback_manager.on_retriever_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=kwargs.pop("run_id", None),
)
try:
Expand Down Expand Up @@ -298,9 +297,9 @@ async def ainvoke(
local_metadata=self.metadata,
)
run_manager = await callback_manager.on_retriever_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=kwargs.pop("run_id", None),
)
try:
Expand Down
28 changes: 14 additions & 14 deletions libs/core/langchain_core/runnables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from typing_extensions import Literal, get_args

from langchain_core._api import beta_decorator
from langchain_core.load.dump import dumpd
from langchain_core.load.serializable import (
Serializable,
SerializedConstructor,
Expand Down Expand Up @@ -1763,14 +1762,15 @@ def _call_with_config(
input: Input,
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
serialized: Optional[Dict[str, Any]] = None,
**kwargs: Optional[Any],
) -> Output:
"""Helper method to transform an Input value to an Output value,
with callbacks. Use this method to implement invoke() in subclasses."""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
serialized,
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -1811,14 +1811,15 @@ async def _acall_with_config(
input: Input,
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
serialized: Optional[Dict[str, Any]] = None,
**kwargs: Optional[Any],
) -> Output:
"""Helper method to transform an Input value to an Output value,
with callbacks. Use this method to implement ainvoke() in subclasses."""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
serialized,
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -1871,7 +1872,7 @@ def _batch_with_config(
callback_managers = [get_callback_manager_for_config(c) for c in configs]
run_managers = [
callback_manager.on_chain_start(
dumpd(self),
None,
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -1944,7 +1945,7 @@ async def _abatch_with_config(
run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
callback_manager.on_chain_start(
dumpd(self),
None,
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -2023,7 +2024,7 @@ def _transform_stream_with_config(
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
None,
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -2123,7 +2124,7 @@ async def _atransform_stream_with_config(
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
None,
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
Expand Down Expand Up @@ -2325,7 +2326,6 @@ def to_json(self) -> Union[SerializedConstructor, SerializedNotImplemented]:
dumped = super().to_json()
try:
dumped["name"] = self.get_name()
dumped["graph"] = self.get_graph().to_json()
except Exception:
pass
return dumped
Expand Down Expand Up @@ -2857,7 +2857,7 @@ def invoke(
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down Expand Up @@ -2897,7 +2897,7 @@ async def ainvoke(
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down Expand Up @@ -2962,7 +2962,7 @@ def batch(
# start the root runs, one per input
run_managers = [
cm.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down Expand Up @@ -3089,7 +3089,7 @@ async def abatch(
run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
cm.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down Expand Up @@ -3544,7 +3544,7 @@ def invoke(
)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down Expand Up @@ -3596,7 +3596,7 @@ async def ainvoke(
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
Expand Down
21 changes: 10 additions & 11 deletions libs/core/langchain_core/runnables/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
cast,
)

from langchain_core.load.dump import dumpd
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables.base import (
Runnable,
Expand Down Expand Up @@ -207,9 +206,9 @@ def invoke(
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)

Expand Down Expand Up @@ -246,7 +245,7 @@ def invoke(
except BaseException as e:
run_manager.on_chain_error(e)
raise
run_manager.on_chain_end(dumpd(output))
run_manager.on_chain_end(output)
return output

async def ainvoke(
Expand All @@ -256,9 +255,9 @@ async def ainvoke(
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
Expand Down Expand Up @@ -294,7 +293,7 @@ async def ainvoke(
except BaseException as e:
await run_manager.on_chain_error(e)
raise
await run_manager.on_chain_end(dumpd(output))
await run_manager.on_chain_end(output)
return output

def stream(
Expand All @@ -320,9 +319,9 @@ def stream(
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
final_output: Optional[Output] = None
Expand Down Expand Up @@ -407,9 +406,9 @@ async def astream(
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
None,
input,
name=config.get("run_name"),
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
final_output: Optional[Output] = None
Expand Down
Loading
Loading