Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve caching logic and add disk caching option #3246

Merged
merged 11 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,43 +1125,53 @@ async def build_vertex(
self.run_manager.add_to_vertices_being_run(vertex_id)
try:
params = ""
if vertex.frozen:
should_build = False
if not vertex.frozen:
should_build = True
else:
# Check the cache for the vertex
if get_cache is not None:
cached_result = await get_cache(key=vertex.id)
else:
cached_result = None
if isinstance(cached_result, CacheMiss):
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)
if cached_result and not isinstance(cached_result, CacheMiss):
cached_vertex = cached_result["result"]
# Now set update the vertex with the cached vertex
vertex._built = cached_vertex._built
vertex.result = cached_vertex.result
vertex.results = cached_vertex.results
vertex.artifacts = cached_vertex.artifacts
vertex._built_object = cached_vertex._built_object
vertex._custom_component = cached_vertex._custom_component
if vertex.result is not None:
vertex.result.used_frozen_result = True
should_build = True
else:
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)
else:
try:
cached_vertex_dict = cached_result["result"]
# Now set update the vertex with the cached vertex
vertex._built = cached_vertex_dict["_built"]
vertex.artifacts = cached_vertex_dict["artifacts"]
vertex._built_object = cached_vertex_dict["_built_object"]
vertex._built_result = cached_vertex_dict["_built_result"]
vertex._data = cached_vertex_dict["_data"]
vertex.results = cached_vertex_dict["results"]
try:
vertex._finalize_build()
if vertex.result is not None:
vertex.result.used_frozen_result = True
except Exception:
should_build = True
except KeyError:
should_build = True

if should_build:
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)

if vertex.result is not None:
vertex_dict = {
"_built": vertex._built,
"results": vertex.results,
"artifacts": vertex.artifacts,
"_built_object": vertex._built_object,
"_built_result": vertex._built_result,
"_data": vertex._data,
}

await set_cache(key=vertex.id, data=vertex_dict)

if vertex.results is not None:
params = f"{vertex._built_object_repr()}{params}"
valid = True
result_dict = vertex.result
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/graph/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def validate_model(cls, values):
stream_url = StreamURL(location=message["stream_url"])
values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])})
elif "type" in message:
values["outputs"].update({OutputValue(message=message, type=message["type"])})
values["outputs"].update({key: OutputValue(message=message, type=message["type"])})
return values


Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ async def build(
return

if self.frozen and self._built:
return self.get_requester_result(requester)
return await self.get_requester_result(requester)
elif self._built and requester is not None:
# This means that the vertex has already been built
# and we are just getting the result for the requester
Expand Down
90 changes: 90 additions & 0 deletions src/backend/base/langflow/services/cache/disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import pickle
import time
from typing import Generic, Optional

from diskcache import Cache
from loguru import logger

from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType
from langflow.services.cache.utils import CACHE_MISS


class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore
def __init__(self, cache_dir, max_size=None, expiration_time=3600):
self.cache = Cache(cache_dir)
self.lock = asyncio.Lock()
self.max_size = max_size
self.expiration_time = expiration_time

async def get(self, key, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
return await self._get(key)
else:
return await self._get(key)

async def _get(self, key):
item = await asyncio.to_thread(self.cache.get, key, default=None)
if item:
if time.time() - item["time"] < self.expiration_time:
await asyncio.to_thread(self.cache.touch, key) # Refresh the expiry time
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"]
else:
logger.info(f"Cache item for key '{key}' has expired and will be deleted.")
await self._delete(key) # Log before deleting the expired item
return CACHE_MISS

async def set(self, key, value, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._set(key, value)
else:
await self._set(key, value)

async def _set(self, key, value):
if self.max_size and len(self.cache) >= self.max_size:
await asyncio.to_thread(self.cache.cull)
item = {"value": pickle.dumps(value) if not isinstance(value, (str, bytes)) else value, "time": time.time()}
await asyncio.to_thread(self.cache.set, key, item)

async def delete(self, key, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._delete(key)
else:
await self._delete(key)

async def _delete(self, key):
await asyncio.to_thread(self.cache.delete, key)

async def clear(self, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._clear()
else:
await self._clear()

async def _clear(self):
await asyncio.to_thread(self.cache.clear)

async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._upsert(key, value)
else:
await self._upsert(key, value)

async def _upsert(self, key, value):
existing_value = await self.get(key)
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)

def __contains__(self, key):
return asyncio.run(asyncio.to_thread(self.cache.__contains__, key))

async def teardown(self):
# Clean up the cache directory
self.cache.clear(retry=True)
6 changes: 6 additions & 0 deletions src/backend/base/langflow/services/cache/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING

from langflow.services.cache.disk import AsyncDiskCache
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.factory import ServiceFactory
from langflow.utils.logger import logger
Expand Down Expand Up @@ -36,3 +37,8 @@ def create(self, settings_service: "SettingsService"):
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)
elif settings_service.settings.cache_type == "async":
return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire)
elif settings_service.settings.cache_type == "disk":
return AsyncDiskCache(
cache_dir=settings_service.settings.config_dir,
expiration_time=settings_service.settings.cache_expire,
)
4 changes: 1 addition & 3 deletions src/backend/base/langflow/services/cache/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from loguru import logger

from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType
from langflow.services.cache.utils import CacheMiss

CACHE_MISS = CacheMiss()
from langflow.services.cache.utils import CACHE_MISS


class ThreadingInMemoryCache(CacheService, Generic[LockType]): # type: ignore
Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/services/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,6 @@ def update_build_status(cache_service, flow_id: str, status: "BuildStatus"):
cache_service[flow_id] = cached_flow
cached_flow["status"] = status
cache_service[flow_id] = cached_flow


CACHE_MISS = CacheMiss()
4 changes: 2 additions & 2 deletions src/backend/base/langflow/services/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from pathlib import Path
from shutil import copy2
from typing import Any, List, Optional, Tuple, Type
from typing import Any, List, Literal, Optional, Tuple, Type

import orjson
import yaml
Expand Down Expand Up @@ -79,7 +79,7 @@ class Settings(BaseSettings):
"""SQLite pragmas to use when connecting to the database."""

# cache configuration
cache_type: str = "async"
cache_type: Literal["async", "redis", "memory", "disk"] = "async"
"""The cache type can be 'async' or 'redis'."""
cache_expire: int = 3600
"""The cache expire in seconds."""
Expand Down
13 changes: 12 additions & 1 deletion src/backend/base/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/backend/base/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ filelock = "^3.15.4"
grandalf = "^0.8.0"
crewai = "^0.36.0"
spider-client = "^0.0.27"
diskcache = "^5.6.3"


[tool.poetry.extras]
Expand Down
Loading