Files
Pichome/dzz/aiXhimage/class/Class.StreamHandler.php
2024-04-30 22:55:18 +08:00

213 lines
8.6 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
class StreamHandler {
private $data_buffer;//缓存有可能一条data被切分成两部分了无法解析json所以需要把上一半缓存起来
private $counter;//数据接收计数器
private $qmd5;//问题md5
private $chars;//字符数组,开启敏感词检测时用于缓存待检测字符
private $punctuation;//停顿符号
private $dfa = NULL;
private $check_sensitive = FALSE;
public function __construct($params) {
$this->buffer = '';
$this->counter = 0;
$this->qmd5 = $params['qmd5'] ?? time();
$this->chars = [];
$this->lines = [];
$this->punctuation = ['', '。', '', '', '', '……'];
}
public function set_dfa(&$dfa){
$this->dfa = $dfa;
if(!empty($this->dfa) && $this->dfa->is_available()){
$this->check_sensitive = TRUE;
}
}
public function xhcallback($data) {
$this->counter += 1;
$resp = json_decode($data, true);
if ($resp["header"]["code"] != 0) {
file_put_contents('./log/data.'.$this->qmd5.'.log', $this->counter.'=='.$data.PHP_EOL.'--------------------'.PHP_EOL, FILE_APPEND);
$this->end('服务返回报错:'.json_encode(['content' => $data]));
return strlen($data);
}
// 0、把上次缓冲区内数据拼接上本次的data
$buffer = $this->data_buffer . '[br]' . $data;
// 2、把所有的 '}\n\n{' 替换维 '}[br]{' '}\n\n[' 替换为 '}[br]['
$buffer = str_replace("}\n\n{", '}[br]{', $buffer);
$buffer = str_replace("}\n\n[", '}[br][', $buffer);
// 3、用 '[br]' 分割成多行数组
$lines = explode('[br]', $buffer);
// 4、循环处理每一行对于最后一行需要判断是否是完整的json
$line_c = count($lines);
foreach ($lines as $li => $line) {
$line_data = json_decode(trim($line), TRUE);
if ($resp["header"]["status"] == 2 || $line_data['payload']['choices']['text'][0]['content'] == 'LAND') {
$finallytext = $line_data['payload']['choices']['text'][0]['content'];
$this->sensitive_check($finallytext);
$this->end();
} elseif (isset($line_data['payload']['choices']['text'][0]['content'])) {
$this->sensitive_check($line_data['payload']['choices']['text'][0]['content']);
}
// 检查是否已处理完一个完整的JSON对象
if (json_last_error() === JSON_ERROR_NONE) {
// 清空缓冲区,准备接收新的数据
$this->data_buffer = '';
}
}
// 如果仍有未完成的JSON对象残留将其附加回$data等待下次回调处理
if (!empty($this->data_buffer)) {
$data .= $this->data_buffer;
}
return strlen($data);
}
public function callback($data) {
$this->counter += 1;
file_put_contents('./log/data.'.$this->qmd5.'.log', $this->counter.'=='.$data.PHP_EOL.'--------------------'.PHP_EOL, FILE_APPEND);
$result = json_decode($data, TRUE);
if(is_array($result)){
$this->end('openai 请求错误:'.json_encode($result));
return strlen($data);
}
//print_r($data);die;
/*
此处步骤仅针对 openai 接口而言
每次触发回调函数时里边会有多条data数据需要分割
如某次收到 $data 如下所示:
data: {"id":"chatcmpl-6wimHHBt4hKFHEpFnNT2ryUeuRRJC","object":"chat.completion.chunk","created":1679453169,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{"role":"assistant"},"index":0,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-6wimHHBt4hKFHEpFnNT2ryUeuRRJC","object":"chat.completion.chunk","created":1679453169,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{"content":"以下"},"index":0,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-6wimHHBt4hKFHEpFnNT2ryUeuRRJC","object":"chat.completion.chunk","created":1679453169,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{"content":"是"},"index":0,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-6wimHHBt4hKFHEpFnNT2ryUeuRRJC","object":"chat.completion.chunk","created":1679453169,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{"content":"使用"},"index":0,"finish_reason":null}]}
最后两条一般是这样的:
data: {"id":"chatcmpl-6wimHHBt4hKFHEpFnNT2ryUeuRRJC","object":"chat.completion.chunk","created":1679453169,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}\n\ndata: [DONE]
根据以上 openai 的数据格式,分割步骤如下:
*/
// 0、把上次缓冲区内数据拼接上本次的data
$buffer = $this->data_buffer.$data;
//拼接完之后,要把缓冲字符串清空
$this->data_buffer = '';
// 1、把所有的 'data: {' 替换为 '{' 'data: [' 换成 '['
$buffer = str_replace('data: {', '{', $buffer);
$buffer = str_replace('data: [', '[', $buffer);
// 2、把所有的 '}\n\n{' 替换维 '}[br]{' '}\n\n[' 替换为 '}[br]['
$buffer = str_replace("}\n\n{", '}[br]{', $buffer);
$buffer = str_replace("}\n\n[", '}[br][', $buffer);
// 3、用 '[br]' 分割成多行数组
$lines = explode('[br]', $buffer);
// 4、循环处理每一行对于最后一行需要判断是否是完整的json
$line_c = count($lines);
foreach($lines as $li=>$line){
if(trim($line) == '[DONE]'){
//数据传输结束
$this->data_buffer = '';
$this->counter = 0;
//$this->sensitive_check();
$this->end();
break;
}
$line_data = json_decode(trim($line), TRUE);
if( !is_array($line_data) || !isset($line_data['payload']['choices']) || !isset($line_data['payload']['choices'][0]) ){
if($li == ($line_c - 1)){
//如果是最后一行
$this->data_buffer = $line;
break;
}
//如果是中间行无法json解析则写入错误日志中
file_put_contents('./log/error.'.$this->qmd5.'.log', json_encode(['i'=>$this->counter, 'line'=>$line, 'li'=>$li], JSON_UNESCAPED_UNICODE|JSON_PRETTY_PRINT).PHP_EOL.PHP_EOL, FILE_APPEND);
continue;
}
if( isset($line_data['payload']['choices'][0]['delta']) && isset($line_data['payload']['choices'][0]['delta']['content']) ){
$this->sensitive_check($line_data['payload']['choices'][0]['delta']['content']);
}
}
return strlen($data);
}
private function sensitive_check($content = NULL){
// 如果不检测敏感词,则直接返回给前端
if(!$this->check_sensitive){
$this->write($content);
return;
}
//每个 content 都检测是否包含换行或者停顿符号,如有,则成为一个新行
if(!$this->has_pause($content)){
$this->chars[] = $content;
return;
}
$this->chars[] = $content;
$content = implode('', $this->chars);
if($this->dfa->containsSensitiveWords($content)){
$content = $this->dfa->replaceWords($content);
$this->write($content);
}else{
foreach($this->chars as $char){
$this->write($char);
}
}
$this->chars = [];
}
private function has_pause($content){
if($content == NULL){
return TRUE;
}
$has_p = false;
if(is_numeric(strripos(json_encode($content), '\n'))){
$has_p = true;
}else{
foreach($this->punctuation as $p){
if( is_numeric(strripos($content, $p)) ){
$has_p = true;
break;
}
}
}
return $has_p;
}
private function write($content = NULL, $flush=TRUE){
if($content != NULL){
echo 'data: '.json_encode(['time'=>date('Y-m-d H:i:s'), 'content'=>$content], JSON_UNESCAPED_UNICODE).PHP_EOL.PHP_EOL;
}
if($flush){
flush();
}
}
public function end($content = NULL){
if(!empty($content)){
$this->write($content, FALSE);
}
echo 'retry: 86400000'.PHP_EOL;
echo 'event: close'.PHP_EOL;
echo 'data: Connection closed'.PHP_EOL.PHP_EOL;
flush();
}
}