Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Add ruff rules TRY3xx #4098

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/backend/base/langflow/api/v1/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def delete_api_key_route(
):
try:
delete_api_key(db, api_key_id)
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key deleted"}


@router.post("/store")
Expand Down Expand Up @@ -88,10 +88,11 @@ def save_store_api_key(
domain=auth_settings.COOKIE_DOMAIN,
)

return {"detail": "API Key saved"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e

return {"detail": "API Key saved"}


@router.delete("/store")
def delete_store_api_key(
Expand All @@ -101,6 +102,7 @@ def delete_store_api_key(
try:
current_user.store_api_key = None
db.commit()
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e

return {"detail": "API Key deleted"}
157 changes: 89 additions & 68 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
playgroundSuccess=True,
),
)
return first_layer, vertices_to_run, graph
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_playground,
Expand All @@ -205,6 +204,8 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
logger.exception("Error checking build status")
raise HTTPException(status_code=500, detail=str(exc)) from exc

return first_layer, vertices_to_run, graph

async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse:
flow_id_str = str(flow_id)

Expand Down Expand Up @@ -302,7 +303,6 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
Expand All @@ -317,6 +317,8 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc

return build_response

async def build_vertices(
vertex_id: str,
graph: Graph,
Expand Down Expand Up @@ -588,7 +590,6 @@ async def build_vertex(
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
Expand All @@ -603,6 +604,90 @@ async def build_vertex(
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc

return build_response


async def _stream_vertex(flow_id: str, vertex_id: str, chat_service: ChatService):
graph = None
try:
try:
cache = await chat_service.get_cache(flow_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return

if not cache:
# If there's no cache
msg = f"No cache found for {flow_id}."
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
else:
graph = cache.get("result")

try:
vertex: InterfaceVertex = graph.get_vertex(vertex_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return

if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return

if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)

elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
try:
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))


@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
Expand Down Expand Up @@ -638,70 +723,6 @@ async def build_vertex_stream(
HTTPException: If an error occurs while building the vertex.
"""
try:
flow_id_str = str(flow_id)

async def stream_vertex():
graph = None
try:
cache = await chat_service.get_cache(flow_id_str)
if not cache:
# If there's no cache
msg = f"No cache found for {flow_id_str}."
raise ValueError(msg)
else:
graph = cache.get("result")

vertex: InterfaceVertex = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
raise ValueError(msg)
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)

elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
raise ValueError(msg)

except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id_str, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))

return StreamingResponse(stream_vertex(), media_type="text/event-stream")
return StreamingResponse(_stream_vertex(str(flow_id), vertex_id, chat_service), media_type="text/event-stream")
ogabrielluiz marked this conversation as resolved.
Show resolved Hide resolved
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building Component") from exc
Loading
Loading