Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

功能请求:希望标准项目可以实现流式响应 #3282

Open
quantstu opened this issue Sep 24, 2023 · 13 comments
Open

功能请求:希望标准项目可以实现流式响应 #3282

quantstu opened this issue Sep 24, 2023 · 13 comments

Comments

@quantstu
Copy link

描述

我们希望在标准项目中实现流式响应的功能,以便满足特定应用场景的需求。流式响应允许服务器持续向客户端发送数据,而不需要客户端不断地进行轮询请求,这在某些实时通信和事件推送应用中非常有用。

背景

最近,我们的项目需要与大型语言模型进行交互,以获取实时生成的文本数据。这种交互需要使用 Server-Sent Events (SSE) 协议,其中 content-type 设置为 text/event-stream。在我们的项目中,我们选择使用 Koa 来实现 SSE,具体实现如下:

// 发送消息
const sendMessage = async (stream) => {
  const data = [
    '现在科学技术的发展速度叫人惊叹',
    '同样在数码相机的技术创新上',
    '随着数码相机越来越普及',
    '数码相机现已成为大家生活中不可缺少的电子产品',
    '而正是因为这样,技术的创新也显得尤为重要',
  ];

  // 循环上面数组: 推送数据、休眠 2 秒
  for (const value of data) {
    stream.write(`data: ${value}\n\n`); // 写入数据(推送数据)
    await new Promise((resolve) => setTimeout(resolve, 2000));
  }

  // 结束流
  stream.end();
};

router.get('/demo', async (ctx) => {
  // 1. 设置响应头
  ctx.set({
    'Connection': 'keep-alive',
    'Cache-Control': 'no-cache',
    'Content-Type': 'text/event-stream', // 表示返回数据是个 stream
  });

  // 2. 创建流、并作为接口数据进行返回
  const stream = new PassThrough();
  ctx.body = stream;
  ctx.status = 200;

  // 3. 推送流数据
  sendMessage(stream, ctx);
});

请求

为了实现流式响应的功能,我们建议在标准项目中提供相应的支持和工具,以便开发人员可以更轻松地创建类似的 SSE 端点。这将使标准项目更加灵活,并能够满足更多实时应用的需求。

预期行为

我们期望标准项目中的流式响应功能包括以下要点:

  1. 能够设置响应头,包括必要的 SSE 头部信息(如 Connection、Cache-Control、Content-Type)。
  2. 能够创建一个可写流对象,作为响应的主体。
  3. 能够轻松地向流中写入数据,以实现数据的持续推送。
  4. 能够在需要时结束流,以通知客户端数据传输的结束。

这些功能将使开发人员能够更方便地实现 SSE 以及其他流式响应协议,从而增强项目的实时通信能力。

相关文档

(以上部分内容由AI协助生成)

@czy88840616
Copy link
Member

czy88840616 commented Sep 25, 2023

直接引用 koa-sse 即可。

@quantstu
Copy link
Author

在Next.js中可以直接将ReadableStream作为一个响应,我想在Midway中实现这样的功能好像不行

Next.js

// app/api/sse/route.ts
export async function GET() {
  const encoder = new TextEncoder()
  const stream = new ReadableStream({
    async start(controller) {
      const data = [
        '渐进式设计',
        '提供从基础到入门再到企业级的升级方案',
        '解决应用维护与拓展性难题',
      ]
      while (data.length) {
        const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
        console.log(chunk)
        controller.enqueue(encoder.encode(chunk))
        await new Promise(resolve => setTimeout(resolve, 100))
      }
      controller.close()
    },
  })
  return new Response(stream)
}

Response Raw

data: {"content":"渐进式设计"}

data: {"content":"提供从基础到入门再到企业级的升级方案"}

data: {"content":"解决应用维护与拓展性难题"}

Midway

// src/controller/sse.controller.ts
import { Controller, Get, Inject } from '@midwayjs/core'
import type { Context } from '@midwayjs/koa'

@Controller('/api')
export class APIController {
  @Inject()
  ctx: Context

  @Get('/sse')
  async createChatCompletion() {
    const encoder = new TextEncoder()
    const stream = new ReadableStream({
      async start(controller) {
        const data = [
          '渐进式设计',
          '提供从基础到入门再到企业级的升级方案',
          '解决应用维护与拓展性难题',
        ]
        while (data.length) {
          const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
          console.log(chunk)
          controller.enqueue(encoder.encode(chunk))
          await new Promise(resolve => setTimeout(resolve, 100))
        }
        controller.close()
      },
    })
    return stream
  }
}

Response Raw

{}

Logs

[16:19:48] Node.js server restarted in 1468 ms

data: {"content":"渐进式设计"}


2023-09-25 16:20:02,889 INFO 3312 [-/::1/-/214ms GET /api/sse] Report in "src/middleware/report.middleware.ts", rt = 4ms      
data: {"content":"提供从基础到入门再到企业级的升级方案"}


data: {"content":"解决应用维护与拓展性难题"}


@leemotive
Copy link
Contributor

我们用的 Readable 在 midway 中实现流式响应

const read = new Readable();  // node 内部 stream 模块
read._read = () => {};
read.push(streamData.content);   // 异常调用返回一次数据
read.push(null);  // 结束的时候 push 一个 null

return read;  // 以 read 对象作为响应

@tmhulw
Copy link

tmhulw commented Oct 11, 2023

我们用的 Readable 在 midway 中实现流式响应

const read = new Readable();  // node 内部 stream 模块
read._read = () => {};
read.push(streamData.content);   // 异常调用返回一次数据
read.push(null);  // 结束的时候 push 一个 null

return read;  // 以 read 对象作为响应

大兄弟,贴个完整代码参考下

@czy88840616
Copy link
Member

标准项目一直都是可以流式响应的。

ctx.res.send(xxx) / ctx.res.end() 即可。

@czy88840616
Copy link
Member

这个要具体看sse这个模块里面的逻辑了,你这么引用,所有的请求是都会过它的。

@pionear-layen
Copy link

有完整案例看看吗,大佬们

@WillieChan2015
Copy link

+1,有无完整例子可以参考看看

@czy88840616
Copy link
Member

我刚好写了一个。。。

 @Get('/')
  async home(): Promise<any> {
    this.ctx.status = 200;
    this.ctx.set('Transfer-Encoding', 'chunked');
    for (let i = 0; i < 100; i++) {
      await sleep(100);
      this.ctx.res.write('abc'.repeat(100));
    }
    
    this.ctx.res.end();
  }

@pionear-layen
Copy link

@hjane
Copy link

hjane commented Mar 2, 2024

有一个方法可以实现。
我在尝试中安装了 koa-sse-stream
因为它不是标准的 midwayjs 组件,所以我在configuration.ts中直接配置无效
@configuration({
imports: [
koa
]})

如果用
this.app.use(sse({ maxClients: 5000, pingInterval: 30000 }));
确实会生效,效果非常完美,但是我其他的路由都被影响了。

我绕了一圈找现成可用的方式,最后还是回到这个有过一点成果的尝试上。
这个插件封装的很简洁,示例写的其实很完整
image

参考插件代码,我实现了我要的效果

首先把sse.js 拿出来,当作一个服务文件
image
可以看到我几乎是复制过来的

然后写一个StreamMiddleware中间件 只有以/stream开头的路由会执行这个中间件

import { Middleware, IMiddleware } from '@midwayjs/core';
import { NextFunction, Context } from '@midwayjs/koa';
import { SSETransform } from '../service/sse.service';
// const Stream = require('stream');
const DEFAULT_OPTS = {
  maxClients: 10000,
  pingInterval: 60000,
  closeEvent: 'close',
};
@Middleware()
export class StreamMiddleware implements IMiddleware<Context, NextFunction> {
  resolve() {
    return async (ctx: Context, next: NextFunction) => {
      // 控制器前执行的逻辑
      // const startTime = Date.now();
      // 执行下一个 Web 中间件,最后执行到控制器
      // 这里可以拿到下一个中间件或者控制器的返回值
      if (ctx.res.headersSent) {
        if (!(ctx.sse instanceof SSETransform)) {
          console.error(
            'SSE response header has been send, Unable to create the sse response'
          );
        }
        return await next();
      }
      const sse = new SSETransform(ctx, DEFAULT_OPTS);
      sse.on('close', () => {
        console.log('close');
      });
      ctx.sse = sse;
      await next();
      if (ctx.sse) {
        if (!ctx.body) {
          ctx.body = ctx.sse;
        } else {
          if (!ctx.sse.ended) {
            ctx.sse.send(ctx.body);
          }
          ctx.body = sse;
        }
      }
    };
  }

  match(ctx) {
    return ctx.path.indexOf('/stream/') !== -1;
  }
}

然后在configuration.ts文件中应用中间件
import { StreamMiddleware } from './middleware/stream.middleware';
this.app.useMiddleware([StreamMiddleware, ReportMiddleware]);

最后
image

以上就是我实现的全部流程。

===============

捂脸 不用以上这么麻烦,我没仔细看api 只在路由上加载中间件就可以了
image

@sy05514
Copy link

sy05514 commented Apr 28, 2024

我是这样可以实现
@get('/index')
async index(): Promise {
this.ctx.status = 200;
this.ctx.set('Content-Type', 'text/event-stream');
this.ctx.set('Cache-Control', 'no-cache, no-transform');
this.ctx.set('Connection', 'keep-alive');
this.ctx.set('X-Accel-Buffering', 'no');
this.ctx.res.write('data: {"status":200}' + '\n\n');
// 创建一个不会被解决的 Promise,防止 Koa 自动结束响应
return new Promise((resolve, reject) => {
// 设置一个定时器来定期发送数据到客户端
const intervalId = setInterval(() => {
this.ctx.res.write('data: {"message":"Heartbeat"}\n\n');
}, 60000); // 每60秒发送一次
// 监听连接关闭事件,清理资源
this.ctx.req.on('close', () => {
clearInterval(intervalId);
resolve(1);
});
this.ctx.req.on('error', () => {
clearInterval(intervalId);
reject(new Error('SSE connection error'));
});
});
}

@hjane
Copy link

hjane commented Apr 28, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

9 participants