From 18edc9e030ea3a9e43a966dd27d24a718e4e4462 Mon Sep 17 00:00:00 2001 From: Emil Valeev Date: Mon, 4 Dec 2023 03:08:44 +0600 Subject: [PATCH 1/3] feat(process): config func, process history --- agency.go | 4 - examples/custom_operation/main.go | 2 +- examples/logging/main.go | 4 +- examples/process_history/main.go | 59 ++++++++++++++ .../main.go | 14 ++-- examples/rag_vector_database/main.go | 2 +- examples/speech_to_text/main.go | 4 +- examples/speech_to_text_to_image/main.go | 2 +- examples/translate_text/main.go | 4 +- process.go | 81 +++++++++++++++---- 10 files changed, 143 insertions(+), 33 deletions(-) create mode 100644 examples/process_history/main.go rename examples/{speech_to_text_multi_model => process_history_with_interceptor}/main.go (74%) diff --git a/agency.go b/agency.go index 20cc351..2a56143 100644 --- a/agency.go +++ b/agency.go @@ -22,10 +22,6 @@ type OperationConfig struct { Messages []Message } -func (p *Operation) Config() *OperationConfig { - return p.config -} - // NewOperation allows to create an operation from a function. func NewOperation(handler OperationHandler) *Operation { return &Operation{ diff --git a/examples/custom_operation/main.go b/examples/custom_operation/main.go index 4369a43..5ffb7e8 100644 --- a/examples/custom_operation/main.go +++ b/examples/custom_operation/main.go @@ -11,7 +11,7 @@ import ( func main() { increment := agency.NewOperation(incrementFunc) - msg, err := agency.NewProcess( + msg, err := agency.ProcessFromOperations( increment, increment, increment, ).Execute(context.Background(), agency.UserMessage("0")) diff --git a/examples/logging/main.go b/examples/logging/main.go index 57e35f1..5c53fbf 100644 --- a/examples/logging/main.go +++ b/examples/logging/main.go @@ -15,7 +15,7 @@ func main() { factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) params := openai.TextToTextParams{Model: "gpt-3.5-turbo"} - _, err := agency.NewProcess( + _, err := agency.ProcessFromOperations( factory.TextToText(params).SetPrompt("explain what that means"), factory.TextToText(params).SetPrompt("translate to russian"), factory.TextToText(params).SetPrompt("replace all spaces with '_'"), @@ -31,6 +31,6 @@ func main() { } } -func Logger(input, output agency.Message, cfg *agency.OperationConfig) { +func Logger(input, output agency.Message, cfg *agency.OperationConfig, _ uint) { fmt.Printf("in: %v\nprompt: %v\nout: %v\n\n", input, cfg.Prompt, output) } diff --git a/examples/process_history/main.go b/examples/process_history/main.go new file mode 100644 index 0000000..55deb06 --- /dev/null +++ b/examples/process_history/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "fmt" + + _ "github.com/joho/godotenv/autoload" + + "github.com/neurocult/agency" + "github.com/neurocult/agency/providers/openai" +) + +func main() { + provider := openai.New(openai.Params{Key: "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"}) + params := openai.TextToTextParams{ + Model: "gpt-3.5-turbo", + Temperature: openai.Temperature(0), + MaxTokens: 100, + } + + result, err := agency.NewProcess( + agency.ProcessStep{ + Operation: provider. + TextToText(params). + SetPrompt("Increase the number by adding 1 to it. Answer only in numbers, without text"), + }, + agency.ProcessStep{ + Operation: provider. + TextToText(params). + SetPrompt("Double the number. Answer only in numbers, without text"), + }, + agency.ProcessStep{ + ConfigFunc: func(history agency.ProcessHistory, cfg *agency.OperationConfig) error { + firstStepResult, _ := history.Get(0) + cfg.Prompt = fmt.Sprintf("Add %s", firstStepResult) + return nil + }, + Operation: provider.TextToText(params), + }, + ).Execute( + context.Background(), + agency.UserMessage("5"), + func(in, out agency.Message, cfg *agency.OperationConfig, stepIndex uint) { + fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out) + }, + ) + + if err != nil { + panic(err) + } + + fmt.Println(result) +} + +// InjectHistory allows to pass history between operations by injecting it into the config. +func InjectHistory(history agency.ProcessHistory, cfg *agency.OperationConfig) error { + cfg.Messages = history.All() + return nil +} diff --git a/examples/speech_to_text_multi_model/main.go b/examples/process_history_with_interceptor/main.go similarity index 74% rename from examples/speech_to_text_multi_model/main.go rename to examples/process_history_with_interceptor/main.go index 92cec8d..f0662c0 100644 --- a/examples/speech_to_text_multi_model/main.go +++ b/examples/process_history_with_interceptor/main.go @@ -15,21 +15,23 @@ import ( type Saver []agency.Message -func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig) { +// This is how we can retrieve process history by hand with the interceptor, without using the history itself. +// But we can't (or it's hard to do) pass history between steps this way. For that we can use config func. +func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig, _ uint) { *s = append(*s, output) } func main() { - factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) // step 1 - hear := factory. + hear := provider. SpeechToText(openai.SpeechToTextParams{ Model: goopenai.Whisper1, }) // step2 - translate := factory. + translate := provider. TextToText(openai.TextToTextParams{ Model: "gpt-3.5-turbo", Temperature: openai.Temperature(0.5), @@ -37,7 +39,7 @@ func main() { SetPrompt("translate to russian") // step 3 - uppercase := factory. + uppercase := provider. TextToText(openai.TextToTextParams{ Model: "gpt-3.5-turbo", Temperature: openai.Temperature(1), @@ -54,7 +56,7 @@ func main() { ctx := context.Background() speechMsg := agency.Message{Content: sound} - _, err = agency.NewProcess( + _, err = agency.ProcessFromOperations( hear, translate, uppercase, diff --git a/examples/rag_vector_database/main.go b/examples/rag_vector_database/main.go index e4f112c..c72e859 100644 --- a/examples/rag_vector_database/main.go +++ b/examples/rag_vector_database/main.go @@ -32,7 +32,7 @@ func main() { Model: "tts-1", ResponseFormat: "mp3", Speed: 1, Voice: "onyx", }) - result, err := agency.NewProcess( + result, err := agency.ProcessFromOperations( retrieve, summarize, voice, diff --git a/examples/speech_to_text/main.go b/examples/speech_to_text/main.go index de2922b..4a4d3b0 100644 --- a/examples/speech_to_text/main.go +++ b/examples/speech_to_text/main.go @@ -15,14 +15,14 @@ import ( ) func main() { - factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) data, err := os.ReadFile("speech.mp3") if err != nil { panic(err) } - result, err := factory.SpeechToText(openai.SpeechToTextParams{ + result, err := provider.SpeechToText(openai.SpeechToTextParams{ Model: goopenai.Whisper1, }).Execute( context.Background(), diff --git a/examples/speech_to_text_to_image/main.go b/examples/speech_to_text_to_image/main.go index 420429e..5293fa1 100644 --- a/examples/speech_to_text_to_image/main.go +++ b/examples/speech_to_text_to_image/main.go @@ -22,7 +22,7 @@ func main() { panic(err) } - msg, err := agency.NewProcess( + msg, err := agency.ProcessFromOperations( factory.SpeechToText(openai.SpeechToTextParams{Model: goopenai.Whisper1}), factory.TextToImage(openai.TextToImageParams{ Model: goopenai.CreateImageModelDallE2, diff --git a/examples/translate_text/main.go b/examples/translate_text/main.go index 90a9f6c..09226a3 100644 --- a/examples/translate_text/main.go +++ b/examples/translate_text/main.go @@ -13,9 +13,9 @@ import ( ) func main() { - factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) - result, err := factory. + result, err := provider. TextToText(openai.TextToTextParams{Model: goopenai.GPT3Dot5Turbo}). SetPrompt("You are a helpful assistant that translates English to French"). Execute(context.Background(), agency.UserMessage("I love programming.")) diff --git a/process.go b/process.go index fecee17..1b0e7a4 100644 --- a/process.go +++ b/process.go @@ -2,35 +2,88 @@ package agency import ( "context" + "errors" + "fmt" ) -// Process is a chain of operations that can be executed in sequence. +// Process is a sequential chain of steps operations that can be executed in sequence. type Process struct { - operations []*Operation + steps []ProcessStep } -func NewProcess(operations ...*Operation) *Process { - return &Process{ - operations: operations, +// ProcessStep is an object that can be chained with other steps forming the process. +type ProcessStep struct { + // Operation that current step depends on. + // It's execution is deferred until the process reaches the corresponding step. + Operation *Operation + // ConfigFunc allows to modify config based a on results from the previous steps. + // It's execution is deferred until the process reaches the corresponding step. + ConfigFunc func(ProcessHistory, *OperationConfig) error +} + +// NewProcess creates new process based on a given steps. If you don't need history use ProcessFromOperations instead. +func NewProcess(steps ...ProcessStep) *Process { + return &Process{steps: steps} +} + +// ProcessFromOperations allows to create process from operations. +// It's handy when all you need is to chain some operations together and you don't want to have an access to history. +func ProcessFromOperations(operations ...*Operation) *Process { + steps := make([]ProcessStep, 0, len(operations)) + for _, operation := range operations { + steps = append(steps, ProcessStep{Operation: operation, ConfigFunc: nil}) + } + return &Process{steps: steps} +} + +// ProcessInterceptor is a function that is called by Process after one step finished but before next one is started. +type ProcessInterceptor func(in Message, out Message, cfg *OperationConfig, stepIndex uint) + +// ProcessHistory stores results of the previous steps of the process. It's a process's execution context. +type ProcessHistory interface { + Get(stepIndex uint) (Message, error) // Get takes index (starts from zero) of the step which result we want to get + All() []Message // All allows to retrieve all the history of the previously processed steps +} + +// processHistory implements ProcessHistory interfaces via simple slice of messages +type processHistory []Message + +// Get is a panic-free way to get a message by index of the step. Indexes starts with zero. Index must be < steps count +func (p processHistory) Get(stepIndex uint) (Message, error) { + i := int(stepIndex) + if i >= len(p) { + return Message{}, errors.New("step index must less than the number of steps") } + return p[i], nil +} + +// All simply returns p as it is. +func (p processHistory) All() []Message { + return p } -// Interceptor is a function that is called by Process after one operation finished but before next one is started. -type Interceptor func(in Message, out Message, cfg *OperationConfig) +// Execute loops over process steps and sequentially executes them by passing output of one step as an input to another. +// If interceptors are provided, they are called on each step. So for N steps and M interceptors there's N x M executions. +func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, error) { + history := make(processHistory, 0, len(p.steps)) + + for i, step := range p.steps { + if step.ConfigFunc != nil { + if err := step.ConfigFunc(history, step.Operation.config); err != nil { + return Message{}, fmt.Errorf("config func on step %d: %w", i, err) + } + } -// Execute iterates over Process's operations and sequentially executes them. -// After first operation is executed it uses its output as an input to the second one and so on until the whole chain is finished. -// It also executes all given interceptors, if they are provided, so for every N operations and M interceptors it's N x M executions. -func (p *Process) Execute(ctx context.Context, input Message, interceptors ...Interceptor) (Message, error) { - for _, operation := range p.operations { - output, err := operation.Execute(ctx, input) + output, err := step.Operation.Execute(ctx, input) if err != nil { return Message{}, err } + history = append(history, output) + // FIXME while these are called AFTER operation and not before it's impossible to modify configuration for _, interceptor := range interceptors { - interceptor(input, output, operation.Config()) + interceptor(input, output, step.Operation.config, uint(i)) } input = output From ffe722affd2a8b4d22fa48658257d8ec7d4ef8ab Mon Sep 17 00:00:00 2001 From: Emil Valeev Date: Mon, 4 Dec 2023 13:40:34 +0600 Subject: [PATCH 2/3] feat(process): return history --- agency.go | 4 +- examples/custom_operation/main.go | 2 +- examples/logging/main.go | 4 +- examples/process_history/main.go | 78 ++++++++++--------- .../process_history_with_interceptor/main.go | 71 ----------------- examples/process_with_config_func/main.go | 57 ++++++++++++++ examples/rag_vector_database/main.go | 2 +- examples/speech_to_text_to_image/main.go | 2 +- process.go | 14 ++-- 9 files changed, 111 insertions(+), 123 deletions(-) delete mode 100644 examples/process_history_with_interceptor/main.go create mode 100644 examples/process_with_config_func/main.go diff --git a/agency.go b/agency.go index 2a56143..eb1f83b 100644 --- a/agency.go +++ b/agency.go @@ -7,8 +7,8 @@ import ( // Operation is basic building block. type Operation struct { - handler OperationHandler - config *OperationConfig + handler OperationHandler // handler must never be nil + config *OperationConfig // config is a pointer because it must be possible to modify it, but it must never be nil } // OperationHandler is a function that implements logic. diff --git a/examples/custom_operation/main.go b/examples/custom_operation/main.go index 5ffb7e8..346209e 100644 --- a/examples/custom_operation/main.go +++ b/examples/custom_operation/main.go @@ -11,7 +11,7 @@ import ( func main() { increment := agency.NewOperation(incrementFunc) - msg, err := agency.ProcessFromOperations( + msg, _, err := agency.ProcessFromOperations( increment, increment, increment, ).Execute(context.Background(), agency.UserMessage("0")) diff --git a/examples/logging/main.go b/examples/logging/main.go index 5c53fbf..ab27338 100644 --- a/examples/logging/main.go +++ b/examples/logging/main.go @@ -15,7 +15,7 @@ func main() { factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) params := openai.TextToTextParams{Model: "gpt-3.5-turbo"} - _, err := agency.ProcessFromOperations( + _, _, err := agency.ProcessFromOperations( factory.TextToText(params).SetPrompt("explain what that means"), factory.TextToText(params).SetPrompt("translate to russian"), factory.TextToText(params).SetPrompt("replace all spaces with '_'"), @@ -31,6 +31,6 @@ func main() { } } -func Logger(input, output agency.Message, cfg *agency.OperationConfig, _ uint) { +func Logger(input, output agency.Message, cfg agency.OperationConfig, _ uint) { fmt.Printf("in: %v\nprompt: %v\nout: %v\n\n", input, cfg.Prompt, output) } diff --git a/examples/process_history/main.go b/examples/process_history/main.go index 55deb06..b56160a 100644 --- a/examples/process_history/main.go +++ b/examples/process_history/main.go @@ -1,59 +1,61 @@ +// To make this example work make sure you have speech.ogg file in the root of directory package main import ( "context" "fmt" + "os" _ "github.com/joho/godotenv/autoload" + goopenai "github.com/sashabaranov/go-openai" "github.com/neurocult/agency" "github.com/neurocult/agency/providers/openai" ) func main() { - provider := openai.New(openai.Params{Key: "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"}) - params := openai.TextToTextParams{ - Model: "gpt-3.5-turbo", - Temperature: openai.Temperature(0), - MaxTokens: 100, + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + + // step 1 + hear := provider. + SpeechToText(openai.SpeechToTextParams{ + Model: goopenai.Whisper1, + }) + + // step2 + translate := provider. + TextToText(openai.TextToTextParams{ + Model: "gpt-3.5-turbo", + Temperature: openai.Temperature(0.5), + }). + SetPrompt("translate to russian") + + // step 3 + uppercase := provider. + TextToText(openai.TextToTextParams{ + Model: "gpt-3.5-turbo", + Temperature: openai.Temperature(1), + }). + SetPrompt("uppercase every letter of the text") + + sound, err := os.ReadFile("speech.mp3") + if err != nil { + panic(err) } - result, err := agency.NewProcess( - agency.ProcessStep{ - Operation: provider. - TextToText(params). - SetPrompt("Increase the number by adding 1 to it. Answer only in numbers, without text"), - }, - agency.ProcessStep{ - Operation: provider. - TextToText(params). - SetPrompt("Double the number. Answer only in numbers, without text"), - }, - agency.ProcessStep{ - ConfigFunc: func(history agency.ProcessHistory, cfg *agency.OperationConfig) error { - firstStepResult, _ := history.Get(0) - cfg.Prompt = fmt.Sprintf("Add %s", firstStepResult) - return nil - }, - Operation: provider.TextToText(params), - }, - ).Execute( - context.Background(), - agency.UserMessage("5"), - func(in, out agency.Message, cfg *agency.OperationConfig, stepIndex uint) { - fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out) - }, - ) + ctx := context.Background() + speechMsg := agency.Message{Content: sound} + _, history, err := agency.ProcessFromOperations( + hear, + translate, + uppercase, + ).Execute(ctx, speechMsg) if err != nil { panic(err) } - fmt.Println(result) -} - -// InjectHistory allows to pass history between operations by injecting it into the config. -func InjectHistory(history agency.ProcessHistory, cfg *agency.OperationConfig) error { - cfg.Messages = history.All() - return nil + for _, msg := range history.All() { + fmt.Println(msg.String()) + } } diff --git a/examples/process_history_with_interceptor/main.go b/examples/process_history_with_interceptor/main.go deleted file mode 100644 index f0662c0..0000000 --- a/examples/process_history_with_interceptor/main.go +++ /dev/null @@ -1,71 +0,0 @@ -// To make this example work make sure you have speech.ogg file in the root of directory -package main - -import ( - "context" - "fmt" - "os" - - _ "github.com/joho/godotenv/autoload" - goopenai "github.com/sashabaranov/go-openai" - - "github.com/neurocult/agency" - "github.com/neurocult/agency/providers/openai" -) - -type Saver []agency.Message - -// This is how we can retrieve process history by hand with the interceptor, without using the history itself. -// But we can't (or it's hard to do) pass history between steps this way. For that we can use config func. -func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig, _ uint) { - *s = append(*s, output) -} - -func main() { - provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) - - // step 1 - hear := provider. - SpeechToText(openai.SpeechToTextParams{ - Model: goopenai.Whisper1, - }) - - // step2 - translate := provider. - TextToText(openai.TextToTextParams{ - Model: "gpt-3.5-turbo", - Temperature: openai.Temperature(0.5), - }). - SetPrompt("translate to russian") - - // step 3 - uppercase := provider. - TextToText(openai.TextToTextParams{ - Model: "gpt-3.5-turbo", - Temperature: openai.Temperature(1), - }). - SetPrompt("uppercase every letter of the text") - - saver := Saver{} - - sound, err := os.ReadFile("speech.mp3") - if err != nil { - panic(err) - } - - ctx := context.Background() - speechMsg := agency.Message{Content: sound} - - _, err = agency.ProcessFromOperations( - hear, - translate, - uppercase, - ).Execute(ctx, speechMsg, saver.Save) - if err != nil { - panic(err) - } - - for _, msg := range saver { - fmt.Println(msg.String()) - } -} diff --git a/examples/process_with_config_func/main.go b/examples/process_with_config_func/main.go new file mode 100644 index 0000000..1bed285 --- /dev/null +++ b/examples/process_with_config_func/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "os" + + _ "github.com/joho/godotenv/autoload" + + "github.com/neurocult/agency" + "github.com/neurocult/agency/providers/openai" +) + +// In this example we demonstrate how we can use config func to build a process where 3 step uses the result of the 1 step +func main() { + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + params := openai.TextToTextParams{ + Model: "gpt-3.5-turbo", + Temperature: openai.Temperature(0), + MaxTokens: 100, + } + + result, _, err := agency.NewProcess( + agency.ProcessStep{ + Operation: provider. + TextToText(params). + SetPrompt("Increase the number by adding 1 to it. Answer only in numbers, without text"), + }, + agency.ProcessStep{ + Operation: provider. + TextToText(params). + SetPrompt("Double the number. Answer only in numbers, without text"), + }, + agency.ProcessStep{ + ConfigFunc: func(history agency.ProcessHistory, cfg *agency.OperationConfig) error { + firstStepResult, _ := history.Get(0) // we ignore error because it's obvious first step exist at the time third executed + cfg.Prompt = fmt.Sprintf("Add %s", firstStepResult) // we override the prompt with the result of the first step + return nil + }, + Operation: provider.TextToText(params), // Note that we don't use SetPrompt because we already set prompt in config func + }, + ).Execute( + context.Background(), + agency.UserMessage("5"), + logStep, + ) + + if err != nil { + panic(err) + } + + fmt.Println(result) +} + +func logStep(in, out agency.Message, cfg agency.OperationConfig, stepIndex uint) { + fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out) +} diff --git a/examples/rag_vector_database/main.go b/examples/rag_vector_database/main.go index c72e859..a48ff10 100644 --- a/examples/rag_vector_database/main.go +++ b/examples/rag_vector_database/main.go @@ -32,7 +32,7 @@ func main() { Model: "tts-1", ResponseFormat: "mp3", Speed: 1, Voice: "onyx", }) - result, err := agency.ProcessFromOperations( + result, _, err := agency.ProcessFromOperations( retrieve, summarize, voice, diff --git a/examples/speech_to_text_to_image/main.go b/examples/speech_to_text_to_image/main.go index 5293fa1..994ccf1 100644 --- a/examples/speech_to_text_to_image/main.go +++ b/examples/speech_to_text_to_image/main.go @@ -22,7 +22,7 @@ func main() { panic(err) } - msg, err := agency.ProcessFromOperations( + msg, _, err := agency.ProcessFromOperations( factory.SpeechToText(openai.SpeechToTextParams{Model: goopenai.Whisper1}), factory.TextToImage(openai.TextToImageParams{ Model: goopenai.CreateImageModelDallE2, diff --git a/process.go b/process.go index 1b0e7a4..b9a8558 100644 --- a/process.go +++ b/process.go @@ -37,7 +37,8 @@ func ProcessFromOperations(operations ...*Operation) *Process { } // ProcessInterceptor is a function that is called by Process after one step finished but before next one is started. -type ProcessInterceptor func(in Message, out Message, cfg *OperationConfig, stepIndex uint) +// Note that there's no way to modify these arguments because they relates to an operation that is already executed. +type ProcessInterceptor func(in Message, out Message, cfg OperationConfig, stepIndex uint) // ProcessHistory stores results of the previous steps of the process. It's a process's execution context. type ProcessHistory interface { @@ -64,30 +65,29 @@ func (p processHistory) All() []Message { // Execute loops over process steps and sequentially executes them by passing output of one step as an input to another. // If interceptors are provided, they are called on each step. So for N steps and M interceptors there's N x M executions. -func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, error) { +func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, ProcessHistory, error) { history := make(processHistory, 0, len(p.steps)) for i, step := range p.steps { if step.ConfigFunc != nil { if err := step.ConfigFunc(history, step.Operation.config); err != nil { - return Message{}, fmt.Errorf("config func on step %d: %w", i, err) + return Message{}, nil, fmt.Errorf("config func on step %d: %w", i, err) } } output, err := step.Operation.Execute(ctx, input) if err != nil { - return Message{}, err + return Message{}, nil, fmt.Errorf("operation execute: %w", err) } history = append(history, output) - // FIXME while these are called AFTER operation and not before it's impossible to modify configuration for _, interceptor := range interceptors { - interceptor(input, output, step.Operation.config, uint(i)) + interceptor(input, output, *step.Operation.config, uint(i)) } input = output } - return input, nil + return input, history, nil } From 332d953c226349ff3ea83ca90cb02af1043d9113 Mon Sep 17 00:00:00 2001 From: Emil Valeev Date: Mon, 4 Dec 2023 15:32:12 +0600 Subject: [PATCH 3/3] wip(examples): agent swarm --- examples/agent_swarm/main.go | 85 ++++++++++++++++++++++++++++++++++++ messages.go | 2 +- process.go | 10 ++--- 3 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 examples/agent_swarm/main.go diff --git a/examples/agent_swarm/main.go b/examples/agent_swarm/main.go new file mode 100644 index 0000000..6cc0423 --- /dev/null +++ b/examples/agent_swarm/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "fmt" + "os" + + _ "github.com/joho/godotenv/autoload" + + "github.com/neurocult/agency" + "github.com/neurocult/agency/providers/openai" +) + +func main() { + provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")}) + params := openai.TextToTextParams{ + Model: "gpt-3.5-turbo", + Temperature: openai.Temperature(0), + MaxTokens: 100, + } + + var ( + msg agency.Message = agency.UserMessage("Flying cars") // this is what we start with + chatHistory []agency.Message // we gonna use this to accumulate history between iterations + ) + + // FIXME currently this example does not work + // maybe we need to be able to set initial history for the process to implement this easily + // current approach cannot be working because on iteration 2 steps 2 and 3 are missing the history of the previous iterations + // also could be related to the fact that history lacks initial message + for i := 0; i < 3; i++ { + output, curHistory, err := agency.NewProcess( // on each iteration we create new process with its own execution context + // writer + agency.ProcessStep{ + ConfigFunc: injectHistory, + Operation: provider. + TextToText(params). + SetPrompt("Create slogan for a given context. A short but catchy phrase"). + SetMessages(chatHistory), // start with the history from the previous iteration + }, + // critic + agency.ProcessStep{ + ConfigFunc: injectHistory, // use history from the previous iteration plus previous step + Operation: provider. + TextToText(params). + SetPrompt("Criticize the given slogan. Find its weaknesses and suggest improvements"), + }, + // censor + agency.ProcessStep{ + ConfigFunc: injectHistory, // use history from the previous iteration plus two previous steps + Operation: provider. + TextToText(params). + SetPrompt( + "You are a safe guard. Text must not contain expectations about future. If you see anything happy, point to it so it can be removed.", + ), + }, + ).Execute( + context.Background(), + msg, + logStep, + ) + + if err != nil { + panic(err) + } + + chatHistory = curHistory.All() + chatHistory = chatHistory[0 : len(chatHistory)-1] // remove last output to avoid duplication, we will have it as the input + msg = output + } + + fmt.Printf("RESULT: %v\n\n", msg) +} + +// injectHistory uses history passed in by the process +// and injects in into the configuration of the operation so operation handler has access to history. +func injectHistory(history agency.ProcessHistory, cfg *agency.OperationConfig) error { + cfg.Messages = append(cfg.Messages, history.All()...) + return nil +} + +// logStep simply prints data related to each step execution. It implements interceptor interface. +func logStep(in, out agency.Message, cfg agency.OperationConfig, stepIndex uint) { + fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out) +} diff --git a/messages.go b/messages.go index f178aa3..b7c4e32 100644 --- a/messages.go +++ b/messages.go @@ -3,7 +3,7 @@ package agency import "fmt" type Message struct { - Role Role + Role Role // TODO refine the model. Role only has something to do with the chat Content []byte } diff --git a/process.go b/process.go index b9a8558..165ea6e 100644 --- a/process.go +++ b/process.go @@ -36,9 +36,9 @@ func ProcessFromOperations(operations ...*Operation) *Process { return &Process{steps: steps} } -// ProcessInterceptor is a function that is called by Process after one step finished but before next one is started. +// ProcessObserver is a function that is called by Process after one step finished but before next one is started. // Note that there's no way to modify these arguments because they relates to an operation that is already executed. -type ProcessInterceptor func(in Message, out Message, cfg OperationConfig, stepIndex uint) +type ProcessObserver func(in Message, out Message, cfg OperationConfig, stepIndex uint) // ProcessHistory stores results of the previous steps of the process. It's a process's execution context. type ProcessHistory interface { @@ -65,7 +65,7 @@ func (p processHistory) All() []Message { // Execute loops over process steps and sequentially executes them by passing output of one step as an input to another. // If interceptors are provided, they are called on each step. So for N steps and M interceptors there's N x M executions. -func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, ProcessHistory, error) { +func (p *Process) Execute(ctx context.Context, input Message, observers ...ProcessObserver) (Message, ProcessHistory, error) { history := make(processHistory, 0, len(p.steps)) for i, step := range p.steps { @@ -80,9 +80,9 @@ func (p *Process) Execute(ctx context.Context, input Message, interceptors ...Pr return Message{}, nil, fmt.Errorf("operation execute: %w", err) } - history = append(history, output) + history = append(history, output) // TODO we miss the original input - for _, interceptor := range interceptors { + for _, interceptor := range observers { interceptor(input, output, *step.Operation.config, uint(i)) }