diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 42de21ac918..05744b68441 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1125,41 +1125,51 @@ async def build_vertex( self.run_manager.add_to_vertices_being_run(vertex_id) try: params = "" - if vertex.frozen: + should_build = False + if not vertex.frozen: + should_build = True + else: # Check the cache for the vertex if get_cache is not None: cached_result = await get_cache(key=vertex.id) else: cached_result = None if isinstance(cached_result, CacheMiss): - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - if cached_result and not isinstance(cached_result, CacheMiss): - cached_vertex = cached_result["result"] - # Now set update the vertex with the cached vertex - vertex._built = cached_vertex._built - vertex.result = cached_vertex.result - vertex.results = cached_vertex.results - vertex.artifacts = cached_vertex.artifacts - vertex._built_object = cached_vertex._built_object - vertex._custom_component = cached_vertex._custom_component - if vertex.result is not None: - vertex.result.used_frozen_result = True + should_build = True else: - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - else: + try: + cached_vertex_dict = cached_result["result"] + # Now set update the vertex with the cached vertex + vertex._built = cached_vertex_dict["_built"] + vertex.artifacts = cached_vertex_dict["artifacts"] + vertex._built_object = cached_vertex_dict["_built_object"] + vertex._built_result = cached_vertex_dict["_built_result"] + vertex._data = cached_vertex_dict["_data"] + vertex.results = cached_vertex_dict["results"] + try: + vertex._finalize_build() + if vertex.result is not None: + vertex.result.used_frozen_result = True + except Exception: + should_build = True + except KeyError: + should_build = True + + if should_build: await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files ) if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) + vertex_dict = { + "_built": vertex._built, + "results": vertex.results, + "artifacts": vertex.artifacts, + "_built_object": vertex._built_object, + "_built_result": vertex._built_result, + "_data": vertex._data, + } + + await set_cache(key=vertex.id, data=vertex_dict) if vertex.result is not None: params = f"{vertex._built_object_repr()}{params}" diff --git a/src/backend/base/langflow/graph/schema.py b/src/backend/base/langflow/graph/schema.py index eab0040c6d3..fdabcdaaa64 100644 --- a/src/backend/base/langflow/graph/schema.py +++ b/src/backend/base/langflow/graph/schema.py @@ -43,7 +43,7 @@ def validate_model(cls, values): stream_url = StreamURL(location=message["stream_url"]) values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])}) elif "type" in message: - values["outputs"].update({OutputValue(message=message, type=message["type"])}) + values["outputs"].update({key: OutputValue(message=message, type=message["type"])}) return values diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 8284736acf3..0b7941734b4 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -753,7 +753,7 @@ async def build( return if self.frozen and self._built: - return self.get_requester_result(requester) + return await self.get_requester_result(requester) elif self._built and requester is not None: # This means that the vertex has already been built # and we are just getting the result for the requester diff --git a/src/backend/base/langflow/services/cache/disk.py b/src/backend/base/langflow/services/cache/disk.py new file mode 100644 index 00000000000..dbbd85f1335 --- /dev/null +++ b/src/backend/base/langflow/services/cache/disk.py @@ -0,0 +1,96 @@ +import asyncio +import pickle +import time +from typing import Generic, Optional + +from diskcache import Cache +from loguru import logger + +from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType +from langflow.services.cache.utils import CACHE_MISS + + +class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore + def __init__(self, cache_dir, max_size=None, expiration_time=3600): + self.cache = Cache(cache_dir) + # Let's clear the cache for now to maintain a similar + # behavior as the in-memory cache + # Later we should implement endpoints for the frontend to grab + # output logs from the cache + if len(self.cache) > 0: + self.cache.clear() + self.lock = asyncio.Lock() + self.max_size = max_size + self.expiration_time = expiration_time + + async def get(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + return await self._get(key) + else: + return await self._get(key) + + async def _get(self, key): + item = await asyncio.to_thread(self.cache.get, key, default=None) + if item: + if time.time() - item["time"] < self.expiration_time: + await asyncio.to_thread(self.cache.touch, key) # Refresh the expiry time + return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] + else: + logger.info(f"Cache item for key '{key}' has expired and will be deleted.") + await self._delete(key) # Log before deleting the expired item + return CACHE_MISS + + async def set(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._set(key, value) + else: + await self._set(key, value) + + async def _set(self, key, value): + if self.max_size and len(self.cache) >= self.max_size: + await asyncio.to_thread(self.cache.cull) + item = {"value": pickle.dumps(value) if not isinstance(value, (str, bytes)) else value, "time": time.time()} + await asyncio.to_thread(self.cache.set, key, item) + + async def delete(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._delete(key) + else: + await self._delete(key) + + async def _delete(self, key): + await asyncio.to_thread(self.cache.delete, key) + + async def clear(self, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._clear() + else: + await self._clear() + + async def _clear(self): + await asyncio.to_thread(self.cache.clear) + + async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._upsert(key, value) + else: + await self._upsert(key, value) + + async def _upsert(self, key, value): + existing_value = await self.get(key) + if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): + existing_value.update(value) + value = existing_value + await self.set(key, value) + + def __contains__(self, key): + return asyncio.run(asyncio.to_thread(self.cache.__contains__, key)) + + async def teardown(self): + # Clean up the cache directory + self.cache.clear(retry=True) diff --git a/src/backend/base/langflow/services/cache/factory.py b/src/backend/base/langflow/services/cache/factory.py index 5cc6b12afe0..74364dbfc0e 100644 --- a/src/backend/base/langflow/services/cache/factory.py +++ b/src/backend/base/langflow/services/cache/factory.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from langflow.services.cache.disk import AsyncDiskCache from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache from langflow.services.factory import ServiceFactory from langflow.utils.logger import logger @@ -36,3 +37,8 @@ def create(self, settings_service: "SettingsService"): return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire) elif settings_service.settings.cache_type == "async": return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire) + elif settings_service.settings.cache_type == "disk": + return AsyncDiskCache( + cache_dir=settings_service.settings.config_dir, + expiration_time=settings_service.settings.cache_expire, + ) diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 3d4131c239c..021c33f9028 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -8,9 +8,7 @@ from loguru import logger from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType -from langflow.services.cache.utils import CacheMiss - -CACHE_MISS = CacheMiss() +from langflow.services.cache.utils import CACHE_MISS class ThreadingInMemoryCache(CacheService, Generic[LockType]): # type: ignore diff --git a/src/backend/base/langflow/services/cache/utils.py b/src/backend/base/langflow/services/cache/utils.py index a89963f5681..c2f3c961124 100644 --- a/src/backend/base/langflow/services/cache/utils.py +++ b/src/backend/base/langflow/services/cache/utils.py @@ -166,3 +166,6 @@ def update_build_status(cache_service, flow_id: str, status: "BuildStatus"): cache_service[flow_id] = cached_flow cached_flow["status"] = status cache_service[flow_id] = cached_flow + + +CACHE_MISS = CacheMiss() diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 658edd57e1c..88c2a64b49f 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -3,7 +3,7 @@ import os from pathlib import Path from shutil import copy2 -from typing import Any, List, Optional, Tuple, Type +from typing import Any, List, Literal, Optional, Tuple, Type import orjson import yaml @@ -79,7 +79,7 @@ class Settings(BaseSettings): """SQLite pragmas to use when connecting to the database.""" # cache configuration - cache_type: str = "async" + cache_type: Literal["async", "redis", "memory", "disk"] = "async" """The cache type can be 'async' or 'redis'.""" cache_expire: int = 3600 """The cache expire in seconds.""" diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index e52b0c5b775..8deb46e0bfa 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -1208,6 +1208,17 @@ files = [ graph = ["objgraph (>=1.7.2)"] profile = ["gprof2dot (>=2022.7.29)"] +[[package]] +name = "diskcache" +version = "5.6.3" +description = "Disk Cache -- Disk and file backed persistent cache." +optional = false +python-versions = ">=3" +files = [ + {file = "diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19"}, + {file = "diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc"}, +] + [[package]] name = "distlib" version = "0.3.8" @@ -7654,4 +7665,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "747dad35b9e5b1338a989ea6bfd4ac3465ba34f792639aeabda3c1ca9b40c689" +content-hash = "fe6710d7325bc2cceeaa298d94d6f1157cfe1533c2acbabe3ecdca5594d9e007" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index d98c673e076..a8de78d9e01 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -79,6 +79,7 @@ filelock = "^3.15.4" grandalf = "^0.8.0" crewai = "^0.36.0" spider-client = "^0.0.27" +diskcache = "^5.6.3" [tool.poetry.extras]