Skip to content
This repository has been archived by the owner on Oct 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #134 from jina-ai/feat-job
Browse files Browse the repository at this point in the history
feat: job
  • Loading branch information
deepankarm committed Aug 4, 2023
2 parents 94be5f2 + f86c584 commit 9182fe7
Show file tree
Hide file tree
Showing 18 changed files with 585 additions and 52 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ langchain-serve currently wraps following apps as a service to be deployed on Ji
- ⚡️ Serverless, autoscaling apps that scales automatically with your traffic.
- 🗝️ Secure handling of secrets and environment variables.
- 📁 Persistent storage (EFS) mounted on your app for your data.
- ⏱️ Trigger one-time jobs to run asynchronously, allowing for non-blocking execution.
- 📊 Builtin logging, monitoring, and traces for your APIs.
- 🤖 No need to change your code to manage APIs, or manage dockerfiles, or worry about infrastructure!

Expand Down Expand Up @@ -436,6 +437,10 @@ lc-serve deploy jcloud main --secrets secrets.env

</details>

## ⏱️ Trigger one-time jobs to run asynchronously

Here's a [step-by-step guide](examples/job/README.md) to trigger one-time jobs to run asynchronously using `@job` decorator.

## 💻 `lc-serve` CLI

`lc-serve` is a simple CLI that helps you to deploy your agents on Jina AI Cloud (JCloud)
Expand Down
121 changes: 121 additions & 0 deletions examples/job/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# ⏱️ Trigger one-time jobs to run asynchronously using `@job` decorator

`langchain-serve` allows you to easily wrap your function to be scheduled asynchronously using the `@serving` decorator. By incorporating this feature, you can effortlessly trigger one-time executions, enhancing the flexibility of your workflow beyond the standard serving APIs.

Let's take a simple example that uses `RetrievalQA` chain to do a question answering based on the file provided.

### 👉 Step 1: Prepare the job function with `@job` decorator

```python
# app.py
import os

import requests
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

from lcserve import job


@job(timeout=100, backofflimit=3)
def my_job(doc_name: str, question: str):
print("Starting the job ...")

url = f"https://raw.githubusercontent.com/langchain-ai/langchain/master/docs/extras/modules/{doc_name}"
response = requests.get(url)
data = response.text
with open("doc.txt", "w") as text_file:
text_file.write(data)
print("Download text complete !!")

embeddings = OpenAIEmbeddings()
loader = TextLoader("doc.txt", encoding="utf8")
text_splitter = CharacterTextSplitter()
docs = text_splitter.split_documents(loader.load())
faiss_index = FAISS.from_documents(docs, embedding=embeddings)
faiss_index.save_local(
folder_path=os.path.dirname(os.path.abspath(__file__)), index_name="index"
)
print("Index complete !!")

llm = ChatOpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(llm, retriever=faiss_index.as_retriever())
result = qa_chain({"query": question})

print(f"\nQuestion: {question}\nAnswer: {result['result']}]\n")
```

In the code, you'll notice the function is adorned with the @job decorator, which accepts two parameters. The first, `timeout`, allows you to set the time limit for the job execution. The second, `backofflimit`, specifies the number of retry attempts allowed before the job is considered as failed.

---

### 👉 Step 2: Create a `requirements.txt` file in your app directory to ensure all necessary dependencies are installed

```text
# requirements.txt
openai
faiss-cpu
```

---

### 👉 Step 3: Run `lc-serve deploy jcloud app` to deploy the app to Jina AI Cloud

We require you deploy the app either through [REST APIs using @serving decorator](../../#-rest-apis-using-serving-decorator) or [Bring your own FastAPI app](../../#-bring-your-own-fastapi-app) first before running any jobs. This step is essential, as the job functionality relies on the existence of the app entity.

You may notice that there is no `OPENAI_API_KEY` explicitly defined in the code. This omission is intentional, and the key won't be necessary if you pass the `secrets.env` file during app deployment. The job will automatically re-use the same set of secrets during its execution.

```bash
lc-serve deploy jcloud app --secret secrets.env
```

```text
# secrets.env
OPENAI_API_KEY=sk-xxx
```

---

### 👉 Step 4: Run `lc-serve job create` to create jobs

After the app deployment is finished (deployed as `langchain-1bde192651`), let's create a job.

```bash
lcserve job create langchain-1bde192651 my_job --params doc_name state_of_the_union.txt --params question 'What did the president say about Ketanji Brown Jackson?'
```

Alternatively you can also create job using the REST API provided.

```bash
curl -X 'POST' \
'https://langchain-1bde192651.wolf.jina.ai/my_job' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer xxx' \
-d '{
"doc_name": "state_of_the_union.txt",
"question": "What did the president say about Ketanji Brown Jackson?"
}'

```
Where the token used after `Authorization: Bearer` can be found at `~/.jina/config.json`.

You can also list all the jobs triggered or get the details for one given job via CLI.

```bash
lcserve job list langchain-1bde192651
```

```bash
lcserve job get my-job-7787b langchain-1bde192651
```

## 👀 What's next?

- [Learn more about Langchain](https://python.langchain.com/docs/)
- [Learn more about langchain-serve](https://github.com/jina-ai/langchain-serve)
- Have questions? [Join our Discord community](https://discord.jina.ai/)
45 changes: 45 additions & 0 deletions examples/job/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os

import requests
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

from lcserve import job


@job(timeout=100, backofflimit=3)
def my_job(doc_name: str, question: str):
print("Starting the job ...")

url = f"https://raw.githubusercontent.com/langchain-ai/langchain/master/docs/extras/modules/{doc_name}"
response = requests.get(url)
data = response.text
with open("doc.txt", "w") as text_file:
text_file.write(data)
print("Download text complete !!")

embeddings = OpenAIEmbeddings()
loader = TextLoader("doc.txt", encoding="utf8")
text_splitter = CharacterTextSplitter()
docs = text_splitter.split_documents(loader.load())
faiss_index = FAISS.from_documents(docs, embedding=embeddings)
faiss_index.save_local(
folder_path=os.path.dirname(os.path.abspath(__file__)), index_name="index"
)
print("Index complete !!")

llm = ChatOpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(llm, retriever=faiss_index.as_retriever())
result = qa_chain({"query": question})

print(f"\nQuestion: {question}\nAnswer: {result['result']}]\n")


if __name__ == "__main__":
my_job(
"paul_graham_essay.txt", "Why Paul flew up to Oregon to visit her mom regularly"
)
2 changes: 2 additions & 0 deletions examples/job/requirement.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
openai
faiss-cpu
1 change: 1 addition & 0 deletions examples/job/secrets.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OPENAI_API_KEY=sk-xxx
2 changes: 1 addition & 1 deletion lcserve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _ignore_warnings():

_ignore_warnings()

from .backend import download_df, serving, slackbot, upload_df
from .backend import download_df, job, serving, slackbot, upload_df
from .backend.slackbot import SlackBot
from .backend.slackbot.memory import MemoryMode, get_memory

Expand Down
80 changes: 75 additions & 5 deletions lcserve/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import os
import sys
from typing import List

import click
import requests
from hubble import Auth
from jcloud.constants import Phase
from jina import Flow

Expand All @@ -23,12 +26,15 @@
get_app_status_on_jcloud,
get_flow_dict,
get_flow_yaml,
get_job_on_jcloud,
get_module_dir,
get_uri,
list_apps_on_jcloud,
list_jobs_on_jcloud,
load_local_df,
patch_secret_on_jcloud,
pause_app_on_jcloud,
remove_app_on_jcloud,
resume_app_on_jcloud,
syncify,
update_requirements,
)
Expand Down Expand Up @@ -72,7 +78,6 @@ async def serve_on_jcloud(
public: bool = False,
lcserve_app: bool = False,
) -> str:
from .backend.playground.utils.helper import get_random_tag
from .flow import push_app_to_hubble

module_dir, is_websocket = get_module_dir(
Expand All @@ -89,7 +94,6 @@ async def serve_on_jcloud(
gateway_id = push_app_to_hubble(
module_dir=module_dir,
requirements=requirements,
tag=get_random_tag(),
version=version,
platform=platform,
verbose=verbose,
Expand Down Expand Up @@ -527,9 +531,9 @@ def push(
verbose=verbose,
public=public,
)
id, tag = gateway_id.split(':')

click.echo(
f'Pushed to Hubble. Use {click.style(get_uri(id, tag), fg="green")} to deploy.'
f'Pushed to Hubble. Use {click.style(f"jinaai+docker://{gateway_id}", fg="green")} to deploy.'
)


Expand Down Expand Up @@ -1021,6 +1025,22 @@ async def status(app_id):
await get_app_status_on_jcloud(app_id)


@serve.command(help='Pause a serving app.')
@click.argument('app-id')
@click.help_option('-h', '--help')
@syncify
async def pause(app_id):
await pause_app_on_jcloud(app_id)


@serve.command(help='Resume a paused app.')
@click.argument('app-id')
@click.help_option('-h', '--help')
@syncify
async def resume(app_id):
await resume_app_on_jcloud(app_id)


@serve.command(help='Remove an app.')
@click.argument('app-id')
@click.help_option('-h', '--help')
Expand All @@ -1029,6 +1049,56 @@ async def remove(app_id):
await remove_app_on_jcloud(app_id)


@click.group(help="Job related operations.")
def job():
pass


@job.command('list', help='List all jobs for an app.')
@click.argument('flow-id')
@click.help_option('-h', '--help')
@syncify
async def list_jobs(flow_id):
await list_jobs_on_jcloud(flow_id)


@job.command('get', help='Get job details.')
@click.argument('job-name')
@click.argument('flow-id')
@click.help_option('-h', '--help')
@syncify
async def get_job(job_name, flow_id):
await get_job_on_jcloud(job_name, flow_id)


@job.command('create', help='Create a job.')
@click.argument('flow-id')
@click.argument('job-name')
@click.option('--params', type=(str, str), multiple=True)
def create_job(flow_id, job_name, params):
from rich import print

token = Auth.get_auth_token()
if not token:
print('You are not logged in, please login using [b]jcloud login[/b] first.')

response = requests.post(
f"https://{flow_id}.wolf.jina.ai/{job_name}",
data=json.dumps(dict(params)),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
if response.status_code == 200:
print(f'Job [red]{response.json()["job_id"]}[/red] was created!')
else:
print(f"Job create request failed!")


serve.add_command(job)


@serve.group(help='Play with predefined apps on JCloud.')
@click.help_option('-h', '--help')
def playground():
Expand Down
2 changes: 1 addition & 1 deletion lcserve/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .agentexecutor import ChainExecutor, LangchainAgentExecutor
from .decorators import serving, slackbot
from .decorators import job, serving, slackbot
from .gateway import LangchainFastAPIGateway, PlaygroundGateway, ServingGateway
from .utils import download_df, upload_df
34 changes: 34 additions & 0 deletions lcserve/backend/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json

import click


@click.command()
@click.option(
'--module',
type=str,
required=True,
)
@click.option(
'--name',
type=str,
required=True,
)
@click.option('--params', type=str)
def cli(module, name, params):
import importlib

from utils import fix_sys_path

fix_sys_path()

inputs = {}
if params:
inputs = json.loads(params)

mod = importlib.import_module(module)
getattr(mod, name)(**dict(inputs))


if __name__ == '__main__':
cli()

0 comments on commit 9182fe7

Please sign in to comment.