南宁市网站建设_网站建设公司_企业官网_seo优化
2026/1/19 16:06:49 网站建设 项目流程
import logging
import json
import difflib
import re
import os
import requests
import pytesseract

from PIL import Image, ImageOps
from io import BytesIO
from typing import Union, List, Dict, Optional, Any, Tuple
from tenacity import retry, stop_after_attempt, wait_random
from openai import OpenAI # 只需要 OpenAI 客户端

from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from label_studio_sdk.label_interface.objects import PredictionValue
from label_studio_sdk.label_interface.object_tags import ImageTag, ParagraphsTag
from label_studio_sdk.label_interface.control_tags import ControlTag, ObjectTag

logger = logging.getLogger(__name__)

# =======================
# SiliconFlow 固定配置
# =======================
SILICONFLOW_BASE_URL = "https://api.siliconflow.cn/v1"
SILICONFLOW_API_KEY = "sk-deezdpffwyyhwglpqbyfyskukllsqjwbhutfgjwjwxnjdzvf" # TODO: 换成你的真实密钥
SILICONFLOW_MODEL = "Qwen/Qwen3-8B"

# 固定的系统提示词(角色设定)
SYSTEM_PROMPT = "你是一个擅长中文文本摘要的助手。请在保留关键信息的前提下用简洁的中文进行概括。"

# 固定的摘要任务指令
TASK_INSTRUCTION = "请对下面的文本进行摘要,突出主要内容,长度控制在200字以内。"


@retry(wait=wait_random(min=5, max=10), stop=stop_after_attempt(6))
def chat_completion_call(messages, params, *args, **kwargs):
"""
使用 openai SDK 以 SiliconFlow 为后端,调用 chat completions
"""
# 忽略 params 里的 provider / api_key / model,统一使用写死的 SiliconFlow 配置
client = OpenAI(
base_url=SILICONFLOW_BASE_URL,
api_key=SILICONFLOW_API_KEY,
)

temperature = params.get("temperature", OpenAIInteractive.TEMPERATURE)
n = params.get("num_responses", OpenAIInteractive.NUM_RESPONSES)

request_params = {
"messages": messages,
"model": SILICONFLOW_MODEL,
"n": n,
"temperature": temperature,
"max_tokens": 1000, # 如有需要可调整
}

logger.info(f"SiliconFlow(OpenAI SDK) request_params: {request_params}")
completion = client.chat.completions.create(**request_params)
logger.info(f"SiliconFlow(OpenAI SDK) completion: {completion}")

return completion


def gpt(messages: Union[List[Dict], str], params, *args, **kwargs):
"""
使用固定的系统提示 + 摘要任务指令,调用 SiliconFlow Qwen 模型,返回 List[str]
"""
full_messages: List[Dict[str, str]] = []

# System 角色:设定为中文摘要助手
full_messages.append({"role": "system", "content": SYSTEM_PROMPT})

# 如果传入是字符串,把它当成“要摘要的原文”
if isinstance(messages, str):
user_content = f"{TASK_INSTRUCTION}\n\n原文如下:\n{messages}"
full_messages.append({"role": "user", "content": user_content})
else:
# 兼容原来的调用方式:messages 为 list[dict]
# 在第一个 user 消息前插入 TASK_INSTRUCTION
inserted = False
for m in messages:
if m.get("role") == "user" and not inserted:
new_content = f"{TASK_INSTRUCTION}\n\n原文如下:\n{m.get('content', '')}"
full_messages.append({"role": "user", "content": new_content})
inserted = True
else:
full_messages.append(m)
if not inserted:
# 如果没有 user 消息则补一个
full_messages.append({"role": "user", "content": TASK_INSTRUCTION})

logger.info(f"SiliconFlow(OpenAI SDK) request messages: {full_messages}, params={params}")
completion = chat_completion_call(full_messages, params)
logger.info(f"SiliconFlow(OpenAI SDK) response: {completion}")

# completion.choices[i].message.content
response = [choice.message.content for choice in completion.choices]
return response


class OpenAIInteractive(LabelStudioMLBase):
"""
基于 SiliconFlow Qwen 模型的 Text Summarization ML backend
使用 Label Studio 的 OpenAIInteractive 模板,底层改为 SiliconFlow
"""

# 这些环境变量可以保留(部分仍被使用)
OPENAI_PROVIDER = os.getenv("OPENAI_PROVIDER", "openai")
OPENAI_KEY = os.getenv('OPENAI_API_KEY')
PROMPT_PREFIX = os.getenv("PROMPT_PREFIX", "prompt")
USE_INTERNAL_PROMPT_TEMPLATE = bool(int(os.getenv("USE_INTERNAL_PROMPT_TEMPLATE", 1)))
# 可选:会被 setup() 当作文件路径读取;但摘要逻辑主要由 SYSTEM_PROMPT/TASK_INSTRUCTION 控制
DEFAULT_PROMPT = os.getenv('DEFAULT_PROMPT')
PROMPT_TEMPLATE = os.getenv(
"PROMPT_TEMPLATE",
'**Source Text**:\n\n"{text}"\n\n**Task Directive**:\n\n"{prompt}"'
)
PROMPT_TAG = "TextArea"
SUPPORTED_INPUTS = ("Image", "Text", "HyperText", "Paragraphs")
NUM_RESPONSES = int(os.getenv("NUM_RESPONSES", 1))
TEMPERATURE = float(os.getenv("TEMPERATURE", 0.7))
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
AZURE_RESOURCE_ENDPOINT = os.getenv("AZURE_RESOURCE_ENDPOINT", '')
AZURE_DEPLOYMENT_NAME = os.getenv("AZURE_DEPLOYMENT_NAME")
AZURE_API_VERSION = os.getenv("AZURE_API_VERSION", "2023-05-15")
OLLAMA_ENDPOINT = os.getenv("OLLAMA_ENDPOINT")

def setup(self):
if self.DEFAULT_PROMPT and os.path.isfile(self.DEFAULT_PROMPT):
logger.info(f"Reading default prompt from file: {self.DEFAULT_PROMPT}")
with open(self.DEFAULT_PROMPT, encoding="utf-8") as f:
self.DEFAULT_PROMPT = f.read()

def _ocr(self, image_url):
# Open the image containing the text
response = requests.get(image_url)
image = Image.open(BytesIO(response.content))
image = ImageOps.exif_transpose(image)

# Run OCR on the image
text = pytesseract.image_to_string(image)
return text

def _get_text(self, task_data, object_tag):
"""
根据 object_tag 类型,从 task_data 中提取文本:
- Image: OCR
- Paragraphs: JSON 序列化
- Text/HyperText: 原样返回
"""
data = task_data.get(object_tag.value_name)

if data is None:
return None

if isinstance(object_tag, ImageTag):
return self._ocr(data)
elif isinstance(object_tag, ParagraphsTag):
return json.dumps(data, ensure_ascii=False)
else:
return data

def _get_prompts(self, context, prompt_tag) -> List[str]:
"""获取 Prompt 的值(交互模式 / 存储 / 默认 prompt)"""
if context:
# 交互模式 - 从 context 里读当前 prompt
result = context.get('result')
if result:
for item in result:
if item.get('from_name') == prompt_tag.name:
return item['value']['text']
# 初始化 - 从内部存储读取
elif prompt := self.get(prompt_tag.name):
return [prompt]
# 默认 prompt(注意,如果 USE_INTERNAL_PROMPT_TEMPLATE=1 时会报错并忽略)
elif self.DEFAULT_PROMPT:
if self.USE_INTERNAL_PROMPT_TEMPLATE:
logger.error(
'Using both `DEFAULT_PROMPT` and `USE_INTERNAL_PROMPT_TEMPLATE` is not supported. '
'Please either specify `USE_INTERNAL_PROMPT_TEMPLATE=0` or remove `DEFAULT_PROMPT`. '
'For now, no prompt will be used.'
)
return []
return [self.DEFAULT_PROMPT]

return []

def _match_choices(self, response: List[str], original_choices: List[str]) -> List[str]:
# assuming classes are separated by newlines
matched_labels = []
predicted_classes = response[0].splitlines()

for pred in predicted_classes:
scores = list(
map(lambda l: difflib.SequenceMatcher(None, pred, l).ratio(), original_choices)
)
matched_labels.append(original_choices[scores.index(max(scores))])

return matched_labels

def _find_choices_tag(self, object_tag):
"""Classification predictor"""
li = self.label_interface

try:
choices_from_name, _, _ = li.get_first_tag_occurence(
'Choices',
self.SUPPORTED_INPUTS,
to_name_filter=lambda s: s == object_tag.name,
)

return li.get_control(choices_from_name)
except Exception:
return None

def _find_textarea_tag(self, prompt_tag, object_tag):
"""Free-form text predictor"""
li = self.label_interface

try:
textarea_from_name, _, _ = li.get_first_tag_occurence(
'TextArea',
self.SUPPORTED_INPUTS,
name_filter=lambda s: s != prompt_tag.name,
to_name_filter=lambda s: s == object_tag.name,
)

return li.get_control(textarea_from_name)
except Exception:
return None

def _find_prompt_tags(self) -> Tuple[ControlTag, ObjectTag]:
"""在配置中找到 Prompt 用的 TextArea 以及对应的对象标签"""
li = self.label_interface
prompt_from_name, prompt_to_name, value = li.get_first_tag_occurence(
# prompt tag
self.PROMPT_TAG,
# supported input types
self.SUPPORTED_INPUTS,
# 如果有多个 <TextArea>,选择 name 以 PROMPT_PREFIX 开头的
name_filter=lambda s: s.startswith(self.PROMPT_PREFIX),
)

return li.get_control(prompt_from_name), li.get_object(prompt_to_name)

def _validate_tags(self, choices_tag: str, textarea_tag: str) -> None:
if not choices_tag and not textarea_tag:
raise ValueError('No supported tags found: <Choices> or <TextArea>')

def _generate_normalized_prompt(
self, text: str, prompt: str, task_data: Dict, labels: Optional[List[str]]
) -> str:
"""
因为我们已经在 gpt() 里写死了摘要任务提示,这里的 norm_prompt 可以简单理解为“原文 text”,
但为了兼容原逻辑,仍保留两种模式:
- USE_INTERNAL_PROMPT_TEMPLATE=1: 用 PROMPT_TEMPLATE(text, prompt, labels)
- 否则: 把 prompt 当作 format 模板
"""
if self.USE_INTERNAL_PROMPT_TEMPLATE:
norm_prompt = self.PROMPT_TEMPLATE.format(text=text, prompt=prompt, labels=labels)
else:
# 注意:这里传入的 task_data 中需要包含 text 对应字段,或你自定义的字段
norm_prompt = prompt.format(labels=labels, **task_data)

return norm_prompt

def _generate_response_regions(
self,
response: List[str],
prompt_tag,
choices_tag: ControlTag,
textarea_tag: ControlTag,
prompts: List[str],
) -> List:
"""
把 LLM 返回结果映射为 Label Studio 的 regions
"""
regions = []

if choices_tag and len(response) > 0:
matched_labels = self._match_choices(response, choices_tag.labels)
regions.append(choices_tag.label(matched_labels))

if textarea_tag:
# 对于 Text Summarization,通常我们期望把摘要填入一个 TextArea(比如 name="summary")
regions.append(textarea_tag.label(text=response))

# 把当前使用的 prompt 也记录下来
regions.append(prompt_tag.label(text=prompts))

return regions

def _predict_single_task(
self,
task_data: Dict,
prompt_tag: Any,
object_tag: Any,
prompt: str,
choices_tag: ControlTag,
textarea_tag: ControlTag,
prompts: List[str],
) -> Dict:
"""
对单个任务调用 Qwen 做摘要,并构造 PredictionValue
"""
text = self._get_text(task_data, object_tag)
# 如果有 Choices,则把 labels 传给 prompt(此处主要用于分类任务,摘要时一般不用)
labels = choices_tag.labels if choices_tag else None
norm_prompt = self._generate_normalized_prompt(text, prompt, task_data, labels=labels)

# run inference(底层已改为 SiliconFlow Qwen)
response = gpt(norm_prompt, self.extra_params)
regions = self._generate_response_regions(response, prompt_tag, choices_tag, textarea_tag, prompts)

return PredictionValue(result=regions, score=0.1, model_version=str(self.model_version))

def predict(self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs) -> ModelResponse:
"""
Label Studio 调用的预测入口
"""
predictions = []

# prompt_tag: Prompt 的 TextArea
# object_tag: 我们要做摘要/标注的输入对象
prompt_tag, object_tag = self._find_prompt_tags()
prompts = self._get_prompts(context, prompt_tag)

if prompts:
prompt = "\n".join(prompts)

choices_tag = self._find_choices_tag(object_tag)
textarea_tag = self._find_textarea_tag(prompt_tag, object_tag)
self._validate_tags(choices_tag, textarea_tag)

for task in tasks:
# preload all task data fields, they are needed for prompt
task_data = self.preload_task_data(task, task['data'])
pred = self._predict_single_task(
task_data, prompt_tag, object_tag, prompt, choices_tag, textarea_tag, prompts
)
predictions.append(pred)

return ModelResponse(predictions=predictions)

def _prompt_diff(self, old_prompt, new_prompt):
"""
比较旧 prompt 和新 prompt 的差异
"""
old_lines = old_prompt.splitlines()
new_lines = new_prompt.splitlines()
diff = difflib.unified_diff(old_lines, new_lines, lineterm="")

return "\n".join(
line for line in diff if line.startswith(('+',)) and not line.startswith(('+++', '---'))
)

def fit(self, event, data, **additional_params):
"""
训练接口:这里只用来记录 Prompt 和更新 model_version(Prompt Tuning)
"""
logger.info(f'Data received: {data}')
if event not in ('ANNOTATION_CREATED', 'ANNOTATION_UPDATED'):
return

prompt_tag, object_tag = self._find_prompt_tags()
prompts = self._get_prompts(data['annotation'], prompt_tag)

if not prompts:
logger.info('No prompts recorded.')
return

prompt = '\n'.join(prompts)
current_prompt = self.get(prompt_tag.name)

# 如果没有 Prompt 差异,就不更新版本
if current_prompt:
diff = self._prompt_diff(current_prompt, prompt)
if not diff:
logger.info('No prompt diff found.')
return

logger.info(f'Prompt diff: {diff}')

self.set(prompt_tag.name, prompt)
model_version = self.bump_model_version()
logger.info(f'Updated model version to {str(model_version)}')

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询