2023-09-15
前段时间介绍了如何用 openai 做 Function calling,有朋友就问,如果我需要采用流式输出(stream),那要怎么办呢?
今天就介绍一下 stream + Function calling。
思路其实和 no stream 的时候一样,同样是“如果用户问正常的问题,不用通过知识库搜索答案;如果用户问自闭症相关问题,openai 主动通过知识库搜索后,结合 LLM 返回结果。”
区别主要是 stream 会回复数个 response ,包含数个delta结果,而正常的 response 只有一个,choices 的结果也只有一个。
no stream 结果:
response_message:
{
"role": "assistant",
"content": null,
"function_call": {
"name": "autism_expert",
"arguments": "{\n \"question\": \"自闭症孩子自伤怎么办?"\n}"
}
}
stream 结果:
res:
[1] {
[1] "id": "chatcmpl-7yxY0oU9KIhBSe9Ju1CcXouszL2zd",
[1] "object": "chat.completion.chunk",
[1] "created": 1694762936,
[1] "model": "gpt-3.5-turbo-0613",
[1] "choices": [
[1] {
[1] "index": 0,
[1] "delta": {
[1] "role": "assistant",
[1] "content": null,
[1] "function_call": {
[1] "name": "autism_expert",
[1] "arguments": ""
[1] }
[1] },
[1] "finish_reason": null
[1] }
[1] ]
[1] }
因此,我们的任务是在 stream 开始时便分流,如果 openai 判断不用过知识库(即没有function_call 参数),直接输出结果,并设置一个 flag,让它不用再走知识库。 如果判断需要走知识库,在 stream 结束后提取知识库的内容,根据知识库回答,调整 prompt,并再次唤起 LLM。
主要代码如下:
"use client"
import React from 'react';
import { useState, useEffect } from 'react';
export default function Message({ai, user, content}: {ai: boolean, user: boolean, content: string}){
const [resultText, setResultText] = useState(''); // 使用useState来保存结果
useEffect(() => {
if (ai && content) {
const generate = async () => {
try {
// Fetch the response from the OpenAI API with the signal from AbortController
const response = await fetch("http://localhost:5328/api/python", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ content: content }),
});
// 处理streaming response
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Massage and parse the chunk of data
const chunk = decoder.decode(value);
console.log("Original chunk:", chunk);
setResultText(prevText => prevText + chunk); // 使用setState更新状态
// console.log("Result text:", resultText);
}
} catch (error) {
console.error("Error occurred while generating:", error.message);
}
};
generate();
}
}, [ai, content]);
let messageContent = user ? content : (ai ? resultText : '有什么可以帮你的吗?');
return (
<div className={`flex ${user ? 'flex-row-reverse' : ''}`}>
<img
src="robot_ai.png"
className="w-10 h-10 rounded-full"
/>
<div className="flex flex-col ml-4">
<div className="px-4 py-2 rounded-lg">
<p>{messageContent}</p>
</div>
<div className="text-gray-500 text-sm mt-2 flex justify-between invisible group-hover:visible">
<div>时间</div>
<div className="flex">
{/* <CopyButton /> */}
</div>
</div>
</div>
</div>
)
}
# index.py
from flask import Flask, request, Response
import os, json, re
from dotenv import load_dotenv
from flask_cors import CORS
import openai
from supabase.client import create_client
from langchain.embeddings import MiniMaxEmbeddings
from langchain.vectorstores import SupabaseVectorStore
load_dotenv() # 加载 .env 文件中的变量
app = Flask(__name__)
CORS(app, origins="*")
openai.api_key = os.getenv("OPENAI_API_KEY") # 从环境变量中获取 API 密钥
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_PRIVATE_KEY = os.getenv("SUPABASE_PRIVATE_KEY")
MINIMAX_API_KEY = os.getenv("MINIMAX_API_KEY")
MINIMAX_GROUP_ID = os.getenv("MINIMAX_GROUP_ID")
supabase_client = create_client(SUPABASE_URL, SUPABASE_PRIVATE_KEY)
embeddings = MiniMaxEmbeddings(minimax_api_key=MINIMAX_API_KEY, minimax_group_id=MINIMAX_GROUP_ID)
@app.route("/api/python", methods=["POST"])
def generate():
content = request.json.get('content')
def autism_expert(question):
"""当用户问自闭症问题时,搜索专业答案"""
vector_store = SupabaseVectorStore(
client=supabase_client,
embedding=embeddings,
table_name="documents",
query_name="match_documents",
)
match_documents = vector_store.similarity_search(question)
expert_result = match_documents[0].page_content
#print("expert_result: " + expert_result)
return expert_result
def run_conversation():
# Step 1: send the conversation and available functions to GPT
messages = [{"role": "system", "content": "你是自闭症康复专家"},{"role": "user", "content": content}]
functions = [
{
"name": "autism_expert",
"description": "当用户咨询自闭症类问题有用",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "用户的问题",
},
},
"required": ["question"],
},
}
]
flag_executed = False
for res in openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
messages=messages,
functions=functions,
function_call="auto", # auto is default, but we'll be explicit
stream=True
):
print("res: ")
print(res)
print("====================")
delta = res.choices[0].delta
# print("delta: ")
# print(delta)
# print("====================")
if "function_call" not in delta and 'content' in delta:
print("delta.content")
print(delta.content)
print("====================")
if 'content' in delta:
yield delta.content
flag_executed = True
if not flag_executed:
function_name = "autism_expert"
fuction_to_call = autism_expert
function_response = fuction_to_call(
question=content,
)
messages.append(
{
"role": "function",
"name": function_name,
"content": function_response,
}
) # extend conversation with function response
print("function_message: ")
print(messages)
print("====================")
search_result = messages[2]['content']
answer_match = re.search(r'answer: (.*)', search_result, re.DOTALL)
if answer_match:
answer = answer_match.group(1)
print("answer: " + answer)
print("====================")
prompt = f"""
根据用户的问题,参考背景信息,输出回答。要求回答简短切题,且尽可能有趣地回复。用户问题和参考背景信息会用{{}}来表示。
用户问题:{content}, 参考背景信息:{answer}
"""
for chunk in openai.ChatCompletion.create(
model='gpt-3.5-turbo',
messages=[
{'role': 'user', 'content': prompt}
],
temperature=0,
stream=True # this time, we set stream=True
):
# 有的chunk没有content
if 'content' in chunk.choices[0].delta:
print(chunk.choices[0].delta.content)
yield chunk.choices[0].delta.content
return Response(run_conversation(), mimetype='text/event-stream')
if __name__ == '__main__':
app.run(port=5328,debug=True)