Skip to content
This repository has been archived by the owner on Oct 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #127 from jina-ai/feat-metrics-slack
Browse files Browse the repository at this point in the history
  • Loading branch information
zac-li committed Jul 12, 2023
2 parents f622954 + ddecd4c commit c178524
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
6 changes: 4 additions & 2 deletions lcserve/apps/slackbot/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import langchain
from langchain.agents import AgentExecutor, ConversationalAgent
from langchain.callbacks.manager import CallbackManager
from langchain.memory import ChatMessageHistory
from langchain.prompts import PromptTemplate
from langchain.tools import Tool
Expand All @@ -16,13 +17,14 @@ def update_cache(path):
langchain.llm_cache = SQLiteCache(database_path=os.path.join(path, "llm_cache.db"))


@slackbot
@slackbot(openai_tracing=True)
def agent(
message: str,
prompt: PromptTemplate,
history: ChatMessageHistory,
tools: List[Tool],
reply: Callable,
tracing_handler,
workspace: str,
**kwargs,
):
Expand All @@ -33,7 +35,7 @@ def agent(
memory = get_memory(history)
agent = ConversationalAgent(
llm_chain=LLMChain(
llm=ChatOpenAI(temperature=0, verbose=True),
llm=ChatOpenAI(temperature=0, verbose=True, callbacks=[tracing_handler]),
prompt=prompt,
),
allowed_tools=[tool.name for tool in tools],
Expand Down
28 changes: 23 additions & 5 deletions lcserve/backend/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ def __init__(
self._init_fastapi_app()
self._configure_cors()
self._register_healthz()
self._register_modules()
# _setup_metrics needs to be invoked before _register_modules since slack requires tracking metrics
self._setup_metrics()
self._register_modules()
self._setup_logging()

@property
Expand Down Expand Up @@ -360,8 +361,8 @@ def _setup_metrics(self):
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

if not self.meter_provider:
self.http_duration_counter = None
self.ws_duration_counter = None
self.duration_counter = None
self.request_counter = None
return

FastAPIInstrumentor.instrument_app(
Expand Down Expand Up @@ -606,7 +607,7 @@ def _register_slackbot(
func: Callable,
dirname: str,
commands: Dict[str, Callable] = None,
openai_tracing: bool = False, # TODO: add openai_tracing to slackbot
openai_tracing: bool = False,
**kwargs,
):
from fastapi import Request
Expand All @@ -615,7 +616,22 @@ def _register_slackbot(
from .slackbot import SlackBot

self.logger.info(f'Registering slackbot: {func.__name__}')
bot = SlackBot(workspace=self.workspace)

if openai_tracing:
tracing_handler = OpenAITracingCallbackHandler(
tracer=self.tracer, parent_span=get_current_span()
)
else:
tracing_handler = TracingCallbackHandler(
tracer=self.tracer, parent_span=get_current_span()
)

bot = SlackBot(
workspace=self.workspace,
duration_counter=self.duration_counter,
request_counter=self.request_counter,
tracing_handler=tracing_handler,
)

@self.app.post("/slack/events")
async def endpoint(req: Request):
Expand Down Expand Up @@ -1182,6 +1198,7 @@ def __init__(
'/dry_run',
'/metrics',
'/favicon.ico',
'/slack/events',
]

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
Expand Down Expand Up @@ -1224,6 +1241,7 @@ def __init__(self, app: ASGIApp, logger: JinaLogger):
'/dry_run',
'/metrics',
'/favicon.ico',
'/slack/events',
]

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
Expand Down
8 changes: 7 additions & 1 deletion lcserve/backend/langchain_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import copy
import json
import logging
from dataclasses import dataclass
Expand Down Expand Up @@ -166,7 +167,12 @@ def on_chain_start(
"chain", context=context, end_on_exit=False
) as span:
span.set_attribute("otel.operation.name", operation)
span.add_event("inputs", {"data": json.dumps(inputs)})

# If the event is from slack bot, we need to pop chat_history as it's not serializable,
# nor do we need the chat_history anyway for tracing.
copied_inputs = copy.deepcopy(inputs)
copied_inputs.pop('chat_history', None)
span.add_event("inputs", {"data": json.dumps(copied_inputs)})
self._register_span(run_id, span)
except Exception:
self.logger.error("Error in tracing callback handler", exc_info=True)
Expand Down
53 changes: 51 additions & 2 deletions lcserve/backend/slackbot/slackbot.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import json
import os
import time
from functools import lru_cache
from typing import Any, Callable, Dict, Generator, List, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
List,
Optional,
Tuple,
Union,
)
from urllib.parse import urlparse

from jina.logging.logger import JinaLogger
Expand All @@ -16,13 +27,25 @@
from slack_sdk.errors import SlackApiError
from tenacity import retry, stop_after_attempt, wait_exponential

if TYPE_CHECKING:
from opentelemetry.sdk.metrics import Counter

from ..langchain_helper import OpenAICallbackHandler, TracingCallbackHandler


PROGRESS_MESSAGE = "Processing..."


class SlackBot:
_logger = JinaLogger('SlackBot')

def __init__(self, workspace: str):
def __init__(
self,
workspace: str,
tracing_handler: Union['OpenAICallbackHandler', 'TracingCallbackHandler'],
request_counter: Optional['Counter'] = None,
duration_counter: Optional['Counter'] = None,
):
from langchain.output_parsers import PydanticOutputParser
from slack_bolt import App
from slack_bolt.adapter.fastapi import SlackRequestHandler
Expand All @@ -34,6 +57,9 @@ def __init__(self, workspace: str):

self.slack_app = App()
self.workspace = workspace
self.request_counter = request_counter
self.duration_counter = duration_counter
self.tracing_handler = tracing_handler
self.handler = SlackRequestHandler(self.slack_app)
self._parser = PydanticOutputParser(pydantic_object=TextOrBlock)

Expand Down Expand Up @@ -354,7 +380,23 @@ def get_agent_prompt() -> PromptTemplate:
suffix=SlackBot.get_agent_prompt_suffix(),
)

def metrics_decorator(self, func):
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
if self.request_counter:
self.request_counter.add(1)
if self.duration_counter:
self.duration_counter.add(elapsed_time)
return result

return wrapper_timer

def app_mention(self, func):
func = self.metrics_decorator(func)

@self.slack_app.event('app_mention')
def wrapper(client: WebClient, body, context):
_event: Dict = body["event"]
Expand Down Expand Up @@ -390,6 +432,7 @@ def wrapper(client: WebClient, body, context):
thread_ts=_thread_ts,
parser=self._parser,
),
tracing_handler=self.tracing_handler,
workspace=self.workspace,
user=_user,
context=context,
Expand All @@ -398,6 +441,8 @@ def wrapper(client: WebClient, body, context):
return wrapper

def message(self, func):
func = self.metrics_decorator(func)

@self.slack_app.event('message')
def wrapper(client, body, context):
_event: Dict = body["event"]
Expand Down Expand Up @@ -425,6 +470,7 @@ def wrapper(client, body, context):
thread_ts=_thread_ts,
parser=self._parser,
),
tracing_handler=self.tracing_handler,
workspace=self.workspace,
user=_channel,
context=context,
Expand All @@ -434,6 +480,8 @@ def wrapper(client, body, context):

def command(self, command: str):
def decorator(command_func):
command_func = self.metrics_decorator(command_func)

@self.slack_app.command(command)
def wrapper(ack, client, body, context):
ack()
Expand All @@ -454,6 +502,7 @@ def wrapper(ack, client, body, context):
user_id=_user,
command=command,
),
tracing_handler=self.tracing_handler,
user=_channel,
context=context,
)
Expand Down

0 comments on commit c178524

Please sign in to comment.