RAGFLOW使用flask转发的openai接口
- 互联网
- 2025-08-25 08:57:01

flask转发openai标准接口 背景
搭建RAGFLOW 的过程中,遇到一个比较严重的问题,公司部署的大模型代理需要获取token,且token存在有效期5分钟,在RAGFLOW中不能直接用,所以希望通过flask项目转发请求。
方案比较好的是,RAGFLOW 可以配置OpenAI-API-Compatible
初始方案是计划准备两个接口,第一个接口/v1/chat/completions
@api_llm_proxy_bp.route("/v1/chat/completions", methods=['POST']) async def chat_completions(): payload = request.json logger.debug("chat_completions:{}", payload) res = await CustommizedAsyncOpenAI().chat pletions.create(**payload) json_result = res.to_json() logger.debug("chat_completions response:{}", json_result) return json_resultCustommizedAsyncOpenAI 是openai._client.AsyncOpenAI的子类,主要作用是封装指定公司大模型的url以及实时获取token放到header里面。
class CustommizedAsyncOpenAI(AsyncOpenAI): def __init__(self): base_url = get_config().get("llm_config").get("model_host") super().__init__(base_url=base_url, api_key="fake key") async def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions: options = await super()._prepare_options(options) options.headers = { 'Authorization': f'Bearer {TokenHolder().get_token()}' } return options另一个接口是向量化接口:/v1/embeddings
@api_llm_proxy_bp.route("/v1/embeddings", methods=['POST']) async def embeddings(): payload = request.json logger.debug("embeddings:{}", payload) res = await CustommizedEmbeddingsOpenAI().embeddings.create(**payload) json_result = res.to_json() logger.debug("embeddings response:{}", json_result) return json_resultCustommizedEmbeddingsOpenAI也是自己写的openai._client.AsyncOpenAI的子类,区别是只对应了公司向量模型的url。
在RAGFLOW 中分别配置chat和embeddings,成功添加模型。
STREAM转发的问题与解决在RAGFLOW中添加本地文件并解析没有问题,但是后续在使用知识库聊天的时候发现不支持stream请求,导致无法聊天。资料查了半天,对比RAGFLOW处理stream请求的代码,加上了flask 转发open ai的steam标准响应结构:
@api_llm_proxy_bp.route("/v1/chat/completions", methods=['POST']) async def chat_completions(): payload = request.json logger.debug("chat_completions:{}", payload) if payload.get("stream") is True: res = CustommizedOpenAI().chat pletions.create(**payload) def stream_response(): for chunk in res: logger.info("chat_completions response:{}", chunk.to_json()) yield f"data:{json.dumps(chunk.to_dict(), ensure_ascii=False)}" + "\n\n" return Response(stream_response(), mimetype="text/event-stream") else: res = await CustommizedAsyncOpenAI().chat pletions.create(**payload) json_result = res.to_json() logger.debug("chat_completions response:{}", json_result) return json_result上面的重点是两部分:
第一部分,yield 后面的格式: yield f"data:{json.dumps(chunk.to_dict(), ensure_ascii=False)}" + "\n\n" 第二部分,mimetype Response(stream_response(), mimetype="text/event-stream")使用的CustommizedOpenAI,是自己实现的openai._client.OpenAI的子类,没有用异步是因为前面异步一直没有成功,现在看原因是data的格式问题,因为有些资料提到flask 异步stream 做的不好,所以先当时暂时使用非异步的方案。
stream 转发的验证前面因为stream 转发一直没有处理很好,在RAGFLOW中一直没有成功,问题很难排查,原因在于有两层转发,一层是自己的flask项目中的stream转发,另一层是RAGFLOW中也对openai api stream响应做了二次处理,然后再以stream方式响应。
后面反应过来,可以抛开RAGFLOW,验证自己的stream转发是否成功,直接用openai接口去调用自己的接口:
from openai import OpenAI def test_steam(): client = OpenAI(api_key="fake key", base_url="http://localhost:5002/agent/api/llm_proxy/v1") messages = [ {"role": "user", "content": "你是谁"}, ] res = client.chat pletions.create(messages=messages, model="gpt-4o-240806-deploy-gs", stream=True) for chunk in res: print(chunk.to_json())如果能连续打印如下的结构,说明stream 转发没有问题:
{ "id": "chatcmpl-B324oTT5jWz3xeACiotosOKzyUk8J", "choices": [ { "delta": { "content": "忙" }, "finish_reason": null, "index": 0, "logprobs": null } ], "created": 1740062666, "model": "gpt-4o-2024-11-20", "object": "chat pletion.chunk", "system_fingerprint": "fp_b705f0c291" }RAGFLOW使用flask转发的openai接口由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“RAGFLOW使用flask转发的openai接口”
下一篇
C#十六进制字符串转换为十进制