Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout for onProgress for stream events #410

Open
Vadko opened this issue Mar 2, 2023 · 9 comments · May be fixed by #444
Open

Timeout for onProgress for stream events #410

Vadko opened this issue Mar 2, 2023 · 9 comments · May be fixed by #444

Comments

@Vadko
Copy link

Vadko commented Mar 2, 2023

Describe the feature

I've faced issue working with timeoutMs when using onProgress callback - basically response is marked as timeouted for request overall rather than for onProgress callback itself. is there any way to fix that?

@transitive-bullshit
Copy link
Owner

Good question.

If I'm understanding correctly, once the timeout happens, onProgress can still be called, and you'd like to prevent that from happening?

@Vadko
Copy link
Author

Vadko commented Mar 2, 2023

Not at all, I was meaning that I would like to see timeout for first iteration for onProgress.

Currently if we have timeout for 500ms, onProgress is triggered, and after 500ms request is cancelled.
What I ask about is timeout for 500ms -> if onProgress was called -> timeout is cancelled

Ofc it could be done internally by wrapping onProgress with timeout but I would like to see it as param like onProgressTimeout for example

@transitive-bullshit
Copy link
Owner

Ahhh yep; that's definitely doable.

@Vadko
Copy link
Author

Vadko commented Mar 2, 2023

Thanks! Could you please close this issue once done?

@transitive-bullshit
Copy link
Owner

@Vadko PRs are very welcome 😄

@Vadko
Copy link
Author

Vadko commented Mar 3, 2023

Ye, sure, I will :D

@Vadko
Copy link
Author

Vadko commented Mar 3, 2023

@transitive-bullshit
is this implementation ok? tested, works fine

  async sendMessage(
    text: string,
    opts: types.SendMessageBrowserOptions = {}
  ): Promise<types.ChatMessage> {
    if (!!opts.conversationId !== !!opts.parentMessageId) {
      throw new Error(
        'ChatGPTUnofficialProxyAPI.sendMessage: conversationId and parentMessageId must both be set or both be undefined'
      )
    }

    if (opts.conversationId && !isValidUUIDv4(opts.conversationId)) {
      throw new Error(
        'ChatGPTUnofficialProxyAPI.sendMessage: conversationId is not a valid v4 UUID'
      )
    }

    if (opts.parentMessageId && !isValidUUIDv4(opts.parentMessageId)) {
      throw new Error(
        'ChatGPTUnofficialProxyAPI.sendMessage: parentMessageId is not a valid v4 UUID'
      )
    }

    if (opts.messageId && !isValidUUIDv4(opts.messageId)) {
      throw new Error(
        'ChatGPTUnofficialProxyAPI.sendMessage: messageId is not a valid v4 UUID'
      )
    }

    const {
      conversationId,
      parentMessageId = uuidv4(),
      messageId = uuidv4(),
      action = 'next',
      timeoutMs,
      onProgress
    } = opts

    let {abortSignal} = opts

    let abortController: AbortController = null
    if (timeoutMs && !abortSignal) {
      abortController = new AbortController()
      abortSignal = abortController.signal
    }

    const body: types.ConversationJSONBody = {
      action,
      messages: [
        {
          id: messageId,
          role: 'user',
          content: {
            content_type: 'text',
            parts: [text]
          }
        }
      ],
      model: this._model,
      parent_message_id: parentMessageId
    }

    if (conversationId) {
      body.conversation_id = conversationId
    }

    const result: types.ChatMessage = {
      role: 'assistant',
      id: uuidv4(),
      parentMessageId: messageId,
      conversationId,
      text: ''
    }

    const responseP = new Promise<types.ChatMessage>((resolve, reject) => {
      const url = this._apiReverseProxyUrl
      const headers = {
        ...this._headers,
        Authorization: `Bearer ${this._accessToken}`,
        Accept: 'text/event-stream',
        'Content-Type': 'application/json'
      }

      if (this._debug) {
        console.log('POST', url, {body, headers})
      }

      const onProgressPromise = new Promise<void>((res, rej) => {
        fetchSSE(
          url,
          {
            method: 'POST',
            headers,
            body: JSON.stringify(body),
            signal: abortSignal,
            onMessage: (data: string) => {
              if (data === '[DONE]') {
                return resolve(result)
              }

              try {
                const convoResponseEvent: types.ConversationResponseEvent =
                  JSON.parse(data)
                if (convoResponseEvent.conversation_id) {
                  result.conversationId = convoResponseEvent.conversation_id
                }

                if (convoResponseEvent.message?.id) {
                  result.id = convoResponseEvent.message.id
                }

                const message = convoResponseEvent.message
                // console.log('event', JSON.stringify(convoResponseEvent, null, 2))

                if (message) {
                  let text = message?.content?.parts?.[0]

                  if (text) {
                    result.text = text

                    if (onProgress) {
                      onProgress(result)
                      res()
                    }
                  }
                }
              } catch (err) {
                // ignore for now; there seem to be some non-json messages
                // console.warn('fetchSSE onMessage unexpected error', err)
              }
            }
          },
          this._fetch
        ).catch((err) => {
          const errMessageL = err.toString().toLowerCase()

          if (
            result.text &&
            (errMessageL === 'error: typeerror: terminated' ||
              errMessageL === 'typeerror: terminated')
          ) {
            // OpenAI sometimes forcefully terminates the socket from their end before
            // the HTTP request has resolved cleanly. In my testing, these cases tend to
            // happen when OpenAI has already send the last `response`, so we can ignore
            // the `fetch` error in this case.
            return resolve(result)
          } else {
            return reject(err)
          }
        })
      })

      ;(onProgressPromise as any).cancel = () => {
        abortController.abort()
      }

      return pTimeout(onProgressPromise, {
          milliseconds: 10000,
          message: 'Caught onProgress timeout'
      }).catch((e) => console.log(e));
    });

    if (timeoutMs) {
      if (abortController) {
        // This will be called when a timeout occurs in order for us to forcibly
        // ensure that the underlying HTTP request is aborted.
        ;(responseP as any).cancel = () => {
          abortController.abort()
        }
      }

      return pTimeout(responseP, {
        milliseconds: timeoutMs,
        message: 'ChatGPT timed out waiting for response'
      })
    } else {
      return responseP
    }
  }

@Vadko
Copy link
Author

Vadko commented Mar 3, 2023

if its ok I will refactor it and create PR

@Vadko
Copy link
Author

Vadko commented Mar 4, 2023

@transitive-bullshit could you please look into this snippet and say if the approach is fine? 🙏

@Vadko Vadko linked a pull request Mar 6, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants