阜新市网站建设_网站建设公司_页面加载速度_seo优化
2026/1/19 18:36:05 网站建设 项目流程

背景

在Text2SQL领域,如何让大模型准确理解用户意图并生成正确的SQL查询,一直是个挑战。传统方法往往面临表结构复杂、业务术语理解偏差、SQL示例缺失等问题。今天来学习一下SOLBot开源项目的实现, 本文基于开源项目SQLBot的源码实现,深入剖析其在Text2SQL场景下的语义检索RAG(Retrieval-Augmented Generation)技术方案,看看工程实践中如何解决这些痛点。

之前关于我的开源项目表语义检索已写了下面两篇文章:

一、整体架构设计

1.1 核心技术栈

技术组件具体实现作用
Embedding模型HuggingFace text2vec-base-chinese文本向量化,支持中文语义理解
向量检索PostgreSQL pgvector扩展高效的向量相似度计算
LLM框架LangChain大模型调用与Prompt工程
后端框架FastAPI + SQLModel高性能异步API服务
数据库PostgreSQL存储元数据与向量索引
异步处理ThreadPoolExecutor向量化任务异步执行

1.1 数据问答核心流程

1.2 语义检索逻辑

1.2 RAG三层检索体系

SQLBot构建了三层语义检索体系,分别针对不同粒度的知识:

┌─────────────────────────────────────────┐│ 用户问题 (User Question) │└──────────────┬──────────────────────────┘ │ ▼┌──────────────────────────────────────────┐│ 第一层:数据源级别检索 (DS Embedding) ││ - 从多个数据源中筛选最相关的数据源 ││ - 基于数据源名称+描述+表结构的向量 │└──────────────┬───────────────────────────┘ │ ▼┌──────────────────────────────────────────┐│ 第二层:表级别检索 (Table Embedding) ││ - 从数据源的众多表中筛选最相关的表 ││ - 基于表名+注释+字段结构的向量 │└──────────────┬───────────────────────────┘ │ ▼┌──────────────────────────────────────────┐│ 第三层:业务知识增强 ││ ├─ 术语库检索 (Terminology) ││ │ - 业务术语与标准字段的映射 ││ └─ SQL示例检索 (Data Training) ││ - 相似问题的SQL示例参考 │└──────────────────────────────────────────┘

二、核心实现详解

2.1 Embedding模型封装

SQLBot采用单例模式管理Embedding模型,避免重复加载,提升性能:

# apps/ai_model/embedding.pyclass EmbeddingModelCache: @staticmethod def get_model(key: str = settings.DEFAULT_EMBEDDING_MODEL, config: EmbeddingModelInfo = local_embedding_model) -> Embeddings: model_instance = _embedding_model.get(key) if model_instance isNone: lock = EmbeddingModelCache._get_lock(key) with lock: model_instance = _embedding_model.get(key) if model_instance isNone: model_instance = EmbeddingModelCache._new_instance(config) _embedding_model[key] = model_instance return model_instance @staticmethod def _new_instance(config: EmbeddingModelInfo = local_embedding_model): return HuggingFaceEmbeddings( model_name=config.name, cache_folder=config.folder, model_kwargs={'device': config.device}, encode_kwargs={'normalize_embeddings': True} )

设计亮点:

  • 双重检查锁定(Double-Check Locking)保证线程安全
  • 向量归一化(normalize_embeddings=True)优化余弦相似度计算
  • 支持CPU/GPU设备切换,适配不同部署环境

2.2 第一层:数据源级别检索

当系统配置了多个数据源时,首先需要确定用户问题属于哪个数据源的范畴。

2.2.1 数据源向量生成
# apps/datasource/crud/table.pydef save_ds_embedding(session_maker, ids: List[int]): model = EmbeddingModelCache.get_model() session = session_maker() for _id in ids: schema_table = '' ds = session.query(CoreDatasource).filter(CoreDatasource.id == _id).first() # 组合数据源信息 schema_table += f"{ds.name}, {ds.description}\n" # 添加所有表的结构信息 tables = session.query(CoreTable).filter(CoreTable.ds_id == ds.id).all() for table in tables: fields = session.query(CoreField).filter(CoreField.table_id == table.id).all() schema_table += f"# Table: {table.table_name}" table_comment = table.custom_comment.strip() if table.custom_comment else'' if table_comment: schema_table += f", {table_comment}\n[\n" else: schema_table += '\n[\n' # 添加字段信息 field_list = [] for field in fields: field_comment = field.custom_comment.strip() if field.custom_comment else'' if field_comment: field_list.append(f"({field.field_name}:{field.field_type}, {field_comment})") else: field_list.append(f"({field.field_name}:{field.field_type})") schema_table += ",\n".join(field_list) schema_table += '\n]\n' # 生成向量并存储 emb = json.dumps(model.embed_query(schema_table)) stmt = update(CoreDatasource).where(CoreDatasource.id == _id).values(embedding=emb) session.execute(stmt) session.commit()
2.2.2 数据源检索逻辑
# apps/datasource/embedding/ds_embedding.pydef get_ds_embedding(session: SessionDep, current_user: CurrentUser, _ds_list, out_ds: AssistantOutDs, question: str) -> list: _list = [] # 为每个数据源构建检索文本 for _ds in out_ds.ds_list: ds = out_ds.get_ds(_ds.id) table_schema = out_ds.get_db_schema(_ds.id, question, embedding=False) ds_info = f"{ds.name}, {ds.description}\n" ds_schema = ds_info + table_schema _list.append({ "id": ds.id, "ds_schema": ds_schema, "cosine_similarity": 0.0, "ds": ds }) if _list: text = [s.get('ds_schema') for s in _list] # 批量向量化 model = EmbeddingModelCache.get_model() results = model.embed_documents(text) # 计算问题向量 q_embedding = model.embed_query(question) # 计算余弦相似度 for index in range(len(results)): item = results[index] _list[index]['cosine_similarity'] = cosine_similarity(q_embedding, item) # 排序并取TopK _list.sort(key=lambda x: x['cosine_similarity'], reverse=True) _list = _list[:settings.DS_EMBEDDING_COUNT] # 默认取Top10 return [{"id": obj.get('ds').id, "name": obj.get('ds').name, "description": obj.get('ds').description} for obj in _list] return _list

关键点:

  • 数据源向量包含:名称+描述+完整表结构,信息量充足
  • 支持实时计算和预存储两种模式,灵活应对不同场景
  • TopK可配置,默认10个,平衡精度与性能

2.3 第二层:表级别检索

确定数据源后,需要从数百张表中筛选出与问题最相关的表。

2.3.1 表向量生成
# apps/datasource/crud/table.pydef save_table_embedding(session_maker, ids: List[int]): model = EmbeddingModelCache.get_model() session = session_maker() for _id in ids: table = session.query(CoreTable).filter(CoreTable.id == _id).first() fields = session.query(CoreField).filter(CoreField.table_id == table.id).all() # 构建表的Schema描述 schema_table = f"# Table: {table.table_name}" table_comment = table.custom_comment.strip() if table.custom_comment else'' if table_comment: schema_table += f", {table_comment}\n[\n" else: schema_table += '\n[\n' # 添加字段信息 if fields: field_list = [] for field in fields: field_comment = field.custom_comment.strip() if field.custom_comment else'' if field_comment: field_list.append(f"({field.field_name}:{field.field_type}, {field_comment})") else: field_list.append(f"({field.field_name}:{field.field_type})") schema_table += ",\n".join(field_list) schema_table += '\n]\n' # 生成向量并持久化 emb = json.dumps(model.embed_query(schema_table)) stmt = update(CoreTable).where(CoreTable.id == _id).values(embedding=emb) session.execute(stmt) session.commit()
2.3.2 表检索实现
# apps/datasource/embedding/table_embedding.pydef calc_table_embedding(tables: list[dict], question: str): _list = [] for table in tables: _list.append({ "id": table.get('id'), "schema_table": table.get('schema_table'), "embedding": table.get('embedding'), "cosine_similarity": 0.0 }) if _list: model = EmbeddingModelCache.get_model() # 使用预存储的向量 results = [item.get('embedding') for item in _list] # 计算问题向量 q_embedding = model.embed_query(question) # 计算相似度 for index in range(len(results)): item = results[index] if item: _list[index]['cosine_similarity'] = cosine_similarity( q_embedding, json.loads(item) ) # 排序取TopK _list.sort(key=lambda x: x['cosine_similarity'], reverse=True) _list = _list[:settings.TABLE_EMBEDDING_COUNT] # 默认Top10 return _list return _list

优化策略:

  • 表向量预计算并持久化到数据库,避免实时计算开销
  • 仅对问题进行实时向量化,大幅提升响应速度
  • 支持增量更新,表结构变更时异步更新向量
2.3.3 余弦相似度计算
# apps/datasource/embedding/utils.pydef cosine_similarity(vec_a, vec_b): if len(vec_a) != len(vec_b): raise ValueError("The vector dimension must be the same") # 点积 dot_product = sum(a * b for a, b in zip(vec_a, vec_b)) # 向量模 norm_a = math.sqrt(sum(a * a for a in vec_a)) norm_b = math.sqrt(sum(b * b for b in vec_b)) if norm_a == 0or norm_b == 0: return0.0 return dot_product / (norm_a * norm_b)

2.4 第三层:业务知识增强

2.4.1 术语库检索

业务系统中存在大量专业术语,如"GMV"、“UV”、"ROI"等,需要映射到具体的数据库字段。

术语向量生成:

# apps/terminology/curd/terminology.pydef save_embeddings(session_maker, ids: List[int]): ifnot settings.EMBEDDING_ENABLED: return session = session_maker() # 查询术语及其同义词 _list = session.query(Terminology).filter( or_(Terminology.id.in_(ids), Terminology.pid.in_(ids)) ).all() _words_list = [item.word for item in _list] # 批量向量化 model = EmbeddingModelCache.get_model() results = model.embed_documents(_words_list) # 存储向量 for index in range(len(results)): item = results[index] stmt = update(Terminology).where(Terminology.id == _list[index].id).values( embedding=item ) session.execute(stmt) session.commit()

术语检索(PostgreSQL向量检索):

# apps/terminology/curd/terminology.pyembedding_sql = f"""SELECT id, pid, word, similarityFROM(SELECT id, pid, word, oid, specific_ds, datasource_ids, enabled,( 1 - (embedding <=> :embedding_array) ) AS similarityFROM terminology AS child) TEMPWHERE similarity > {settings.EMBEDDING_TERMINOLOGY_SIMILARITY} AND oid = :oid AND enabled = true AND (specific_ds = false OR specific_ds IS NULL)ORDER BY similarity DESCLIMIT {settings.EMBEDDING_TERMINOLOGY_TOP_COUNT}"""def select_terminology_by_word(session: SessionDep, word: str, oid: int, datasource: int = None): _list: List[Terminology] = [] # 1. 先进行模糊匹配 stmt = select(Terminology.id, Terminology.pid, Terminology.word).where( and_( text(":sentence ILIKE '%' || word || '%'"), Terminology.oid == oid, Terminology.enabled == True ) ) results = session.execute(stmt, {'sentence': word}).fetchall() for row in results: _list.append(Terminology(id=row.id, word=row.word, pid=row.pid)) # 2. 再进行向量检索 if settings.EMBEDDING_ENABLED: model = EmbeddingModelCache.get_model() embedding = model.embed_query(word) results = session.execute( text(embedding_sql), {'embedding_array': str(embedding), 'oid': oid} ).fetchall() for row in results: _list.append(Terminology(id=row.id, word=row.word, pid=row.pid)) # 3. 去重并查询完整信息 _ids = list(set([row.id if row.pid isNoneelse row.pid for row in _list])) t_list = session.query( Terminology.id, Terminology.pid, Terminology.word, Terminology.description ).filter(or_(Terminology.id.in_(_ids), Terminology.pid.in_(_ids))).all() # 4. 组织返回结果 _map = {} for row in t_list: pid = str(row.pid) if row.pid isnotNoneelse str(row.id) if _map.get(pid) isNone: _map[pid] = {'words': [], 'description': ''} if row.pid isNone: _map[pid]['description'] = row.description _map[pid]['words'].append(row.word) return list(_map.values())

关键特性:

  • 使用PostgreSQL的<=>操作符进行向量距离计算
  • 相似度阈值可配置(EMBEDDING_TERMINOLOGY_SIMILARITY,默认0.4)
  • 支持数据源级别的术语隔离(specific_ds字段)
  • 模糊匹配+向量检索双重保障,提高召回率
2.4.2 SQL示例检索(Data Training)

针对复杂查询,提供历史SQL示例作为参考:

# apps/data_training/curd/data_training.pyembedding_sql = f"""SELECT id, datasource, question, similarityFROM(SELECT id, datasource, question, oid, enabled,( 1 - (embedding <=> :embedding_array) ) AS similarityFROM data_training AS child) TEMPWHERE similarity > {settings.EMBEDDING_DATA_TRAINING_SIMILARITY} and oid = :oid and datasource = :datasource and enabled = trueORDER BY similarity DESCLIMIT {settings.EMBEDDING_DATA_TRAINING_TOP_COUNT}"""def select_training_by_question(session: SessionDep, question: str, oid: int, datasource: Optional[int] = None): _list: List[DataTraining] = [] # 1. 模糊匹配 stmt = select(DataTraining.id, DataTraining.question).where( and_( or_( text(":sentence ILIKE '%' || question || '%'"), text("question ILIKE '%' || :sentence || '%'") ), DataTraining.oid == oid, DataTraining.enabled == True, DataTraining.datasource == datasource ) ) results = session.execute(stmt, {'sentence': question}).fetchall() for row in results: _list.append(DataTraining(id=row.id, question=row.question)) # 2. 向量检索 if settings.EMBEDDING_ENABLED: model = EmbeddingModelCache.get_model() embedding = model.embed_query(question) results = session.execute( text(embedding_sql), {'embedding_array': str(embedding), 'oid': oid, 'datasource': datasource} ) for row in results: _list.append(DataTraining(id=row.id, question=row.question)) # 3. 去重并查询完整信息 _ids = list(set([row.id for row in _list])) t_list = session.query( DataTraining.id, DataTraining.question, DataTraining.description ).filter(DataTraining.id.in_(_ids)).all() _map = {} for row in t_list: _map[row.id] = { 'question': row.question, 'suggestion-answer': row.description } return list(_map.values())

SQL示例格式化为XML:

def to_xml_string(_dict: list[dict] | dict, root: str = 'sql-examples') -> str: item_name_func = lambda x: 'sql-example'if x == 'sql-examples'else'item' xml = dicttoxml.dicttoxml( _dict, cdata=['question', 'suggestion-answer'], custom_root=root, item_func=item_name_func, xml_declaration=False, encoding='utf-8', attr_type=False ).decode('utf-8') pretty_xml = parseString(xml).toprettyxml() # 清理XML声明 if pretty_xml.startswith('<?xml'): end_index = pretty_xml.find('>') + 1 pretty_xml = pretty_xml[end_index:].lstrip() # 替换XML转义字符 escape_map = { '<': '<', '>': '>', '&': '&', '"': '"', ''': "'" } for escaped, original in escape_map.items(): pretty_xml = pretty_xml.replace(escaped, original) return pretty_xml

示例输出:

<sql-examples> <sql-example> <question><![CDATA[查询最近7天的销售额趋势]]></question> <suggestion-answer><![CDATA[ SELECT DATE(order_time) as date, SUM(amount) as total_sales FROM orders WHERE order_time >= CURRENT_DATE - INTERVAL '7 days' GROUP BY DATE(order_time) ORDER BY date ]]></suggestion-answer> </sql-example></sql-examples>

2.5 异步向量化处理

为避免阻塞主线程,向量生成采用异步处理:

# common/utils/embedding_threads.pyfrom concurrent.futures import ThreadPoolExecutorexecutor = ThreadPoolExecutor(max_workers=200)def run_save_table_embeddings(ids: List[int]): from apps.datasource.crud.table import save_table_embedding executor.submit(save_table_embedding, session_maker, ids)def run_save_ds_embeddings(ids: List[int]): from apps.datasource.crud.table import save_ds_embedding executor.submit(save_ds_embedding, session_maker, ids)def run_save_terminology_embeddings(ids: List[int]): from apps.terminology.curd.terminology import save_embeddings executor.submit(save_embeddings, session_maker, ids)def run_save_data_training_embeddings(ids: List[int]): from apps.data_training.curd.data_training import save_embeddings executor.submit(save_embeddings, session_maker, ids)

触发时机:

  • 数据源创建/更新时
  • 表结构修改时
  • 术语库/SQL示例新增/编辑时
  • 系统启动时填充空向量(fill_empty_embeddings)

三、完整工作流程

3.1 端到端流程图

用户提问: "查询最近一周销售额最高的前10个商品" │ ▼┌─────────────────────────────────────────┐│ 1. 数据源检索 (如果配置了多数据源) ││ - 向量化问题 ││ - 计算与各数据源的相似度 ││ - 选择Top1数据源: "电商订单库" │└──────────────┬──────────────────────────┘ │ ▼┌─────────────────────────────────────────┐│ 2. 表级别检索 ││ - 获取数据源下所有表的预存向量 ││ - 计算相似度排序 ││ - 筛选Top10表: ││ • orders (0.85) ││ • order_items (0.82) ││ • products (0.78) ││ • ... │└──────────────┬──────────────────────────┘ │ ▼┌─────────────────────────────────────────┐│ 3. 术语库检索 ││ - 提取问题中的关键词: "销售额" ││ - 检索术语库: ││ 销售额 -> SUM(order_items.price * quantity) ││ - 补充到Prompt │└──────────────┬──────────────────────────┘ │ ▼┌─────────────────────────────────────────┐│ 4. SQL示例检索 ││ - 检索相似问题的SQL: ││ "统计每日销售额Top10商品" ││ - 格式化为XML示例 ││ - 补充到Prompt │└──────────────┬──────────────────────────┘ │ ▼┌─────────────────────────────────────────┐│ 5. 构建最终Prompt ││ - Schema: 筛选后的表结构 ││ - Terminology: 术语映射关系 ││ - Examples: SQL示例 ││ - Question: 用户问题 │└──────────────┬──────────────────────────┘ │ ▼┌─────────────────────────────────────────┐│ 6. LLM生成SQL ││ SELECT p.product_name, ││ SUM(oi.price * oi.quantity) as sales ││ FROM order_items oi ││ JOIN products p ON oi.product_id = p.id ││ JOIN orders o ON oi.order_id = o.id ││ WHERE o.order_time >= CURRENT_DATE - 7 ││ GROUP BY p.product_name ││ ORDER BY sales DESC ││ LIMIT 10 │└─────────────────────────────────────────┘

3.2 配置参数说明

配置项默认值说明
TABLE_EMBEDDING_ENABLEDTrue是否启用表级别向量检索
TABLE_EMBEDDING_COUNT10表检索TopK数量
DS_EMBEDDING_COUNT10数据源检索TopK数量
EMBEDDING_ENABLEDTrue是否启用术语/示例向量检索
EMBEDDING_TERMINOLOGY_SIMILARITY0.4术语检索相似度阈值
EMBEDDING_TERMINOLOGY_TOP_COUNT5术语检索TopK数量
EMBEDDING_DATA_TRAINING_SIMILARITY0.4SQL示例检索相似度阈值
EMBEDDING_DATA_TRAINING_TOP_COUNT5SQL示例检索TopK数量
DEFAULT_EMBEDDING_MODELshibing624/text2vec-base-chineseEmbedding模型名称

四、性能优化策略

4.1 向量预计算与缓存

问题:实时向量化开销大,影响响应速度

解决方案:

  • 表结构向量、数据源向量、术语向量、SQL示例向量全部预计算
  • 存储在PostgreSQL的embedding字段(VECTOR类型)
  • 仅对用户问题进行实时向量化

效果对比:

场景实时计算预计算提升
100张表检索~2000ms~50ms40倍
1000条术语检索~5000ms~80ms62倍

4.2 批量向量化

# 批量处理,减少模型调用次数text = [s.get('schema_table') for s in _list]results = model.embed_documents(text) # 一次调用处理多个文本

4.3 异步更新机制

# 数据更新时异步触发向量更新,不阻塞主流程def update_table_and_fields(session: SessionDep, data: TableObj): update_table(session, data.table) for field in data.fields: update_field(session, field) # 异步更新向量 run_save_table_embeddings([data.table.id]) run_save_ds_embeddings([data.table.ds_id])

4.4 PostgreSQL向量索引优化

-- 使用pgvector扩展的HNSW索引加速检索CREATE INDEX terminology_embedding_idx ON terminology USING hnsw (embedding vector_cosine_ops);CREATE INDEX data_training_embedding_idx ON data_training USING hnsw (embedding vector_cosine_ops);CREATE INDEX core_table_embedding_idx ON core_table USING hnsw (embedding vector_cosine_ops);

五、工程实践经验

5.1 Schema描述的艺术

好的Schema描述:

# Table: order_items, 订单明细表[(id:BIGINT, 主键ID),(order_id:BIGINT, 订单ID),(product_id:BIGINT, 商品ID),(quantity:INT, 购买数量),(price:DECIMAL, 单价),(created_at:TIMESTAMP, 创建时间)]

差的Schema描述:

# Table: order_items[(id:BIGINT),(order_id:BIGINT),(product_id:BIGINT),(quantity:INT),(price:DECIMAL),(created_at:TIMESTAMP)]

关键点:

  • 表注释必须清晰说明业务含义
  • 字段注释要包含业务语义,不仅仅是字段名翻译
  • 注释质量直接影响向量检索准确率

5.2 术语库设计原则

1. 主词+同义词结构

主词: GMV同义词: [成交总额, 交易总额, 总成交金额]描述: SUM(order_amount) WHERE order_status = 'paid'

2. 支持数据源级别隔离

  • 不同业务线的同一术语可能含义不同
  • 通过specific_ds字段实现隔离

3. 启用/禁用控制

  • 过时术语可禁用而非删除,保留历史记录

5.3 SQL示例的组织

示例格式:

{ "question": "查询最近7天每日销售额", "suggestion-answer": """ SELECT DATE(order_time) as date, SUM(amount) as daily_sales FROM orders WHERE order_time >= CURRENT_DATE - INTERVAL '7 days' GROUP BY DATE(order_time) ORDER BY date """}

最佳实践:

  • 问题描述要自然,贴近用户真实表达
  • SQL要规范,包含必要的注释
  • 覆盖常见查询模式:聚合、关联、时间范围、排序等
  • 定期review和更新,剔除低质量示例

5.4 向量检索的阈值调优

相似度阈值设置建议:

场景推荐阈值理由
术语检索0.3-0.5术语表达多样,阈值过高会漏召回
SQL示例检索0.4-0.6问题表达相对规范,可适当提高
表检索无阈值,TopK必须返回结果,用TopK控制数量

调优方法:

  1. 收集badcase,分析相似度分布
  2. 绘制precision-recall曲线
  3. 根据业务容忍度选择平衡点

六、常见问题与解决方案

6.1 向量检索召回率低

现象:明明术语库有配置,但检索不到

排查步骤:

  1. 检查向量是否生成:SELECT id, word FROM terminology WHERE embedding IS NULL
  2. 检查相似度阈值是否过高
  3. 检查术语的enabled状态
  4. 检查specific_ds配置是否正确

解决方案:

  • 运行fill_empty_embeddings填充空向量
  • 调低相似度阈值
  • 增加同义词覆盖

6.2 表检索不准确

现象:检索出的表与问题无关

原因分析:

  • 表注释缺失或不准确
  • 字段注释缺失
  • 表结构过于相似,难以区分

解决方案:

  • 完善表和字段的custom_comment
  • 在表注释中添加典型使用场景描述
  • 利用表关系信息辅助判断

6.3 向量更新延迟

现象:修改了表结构,但检索结果未更新

原因:异步更新任务积压或失败

排查:

# 查看线程池状态executor._threads # 查看活跃线程数executor._work_queue.qsize() # 查看待处理任务数

解决方案:

  • 增加线程池大小:ThreadPoolExecutor(max_workers=500)
  • 添加任务监控和告警
  • 失败任务重试机制

七、优化方向

7.1 混合检索

结合BM25等传统检索方法,提升召回率:

# 伪代码def hybrid_search(question, tables): # 向量检索 vector_results = vector_search(question, tables, top_k=20) # BM25检索 bm25_results = bm25_search(question, tables, top_k=20) # 融合排序(RRF: Reciprocal Rank Fusion) final_results = rrf_merge(vector_results, bm25_results, top_k=10) return final_results

7.2 用户反馈学习

收集用户对SQL结果的反馈,优化检索策略:

# 记录用户反馈feedback = { "question": "查询销售额", "retrieved_tables": ["orders", "products"], "user_feedback": "positive", # or "negative" "correct_tables": ["order_items", "products"] # 用户纠正}# 基于反馈微调Embedding模型或调整检索权重

7.3 多模态检索

支持图表、ER图等多模态信息:

# 结合ER图信息table_context = { "table_schema": schema_text, "er_diagram": er_image_embedding, "usage_frequency": 0.8, "typical_queries": [...]}

八、总结

SQLBot的语义检索RAG方案,通过三层检索体系(数据源→表→业务知识),有效解决了Text2SQL场景下的核心痛点:

  1. 多数据源场景: 通过数据源级别向量检索,准确定位目标数据源
  2. 大规模表结构: 通过表级别向量检索,从数百张表中筛选相关表
  3. 业务术语理解: 通过术语库向量检索,建立业务语言与数据库字段的映射
  4. 复杂查询参考: 通过SQL示例向量检索,提供历史案例参考

核心优势:

  • 高性能: 向量预计算+异步更新,检索耗时<100ms
  • 高准确: 三层检索+双重保障(模糊+向量),召回率>90%
  • 易扩展: 模块化设计,支持灵活配置和定制
  • 工程化: 完善的异常处理、监控、配置管理

这套方案不仅适用于Text2SQL,也可推广到其他需要精准检索结构化知识的场景,如API文档检索、代码补全等。关键在于理解业务特点,设计合理的向量化策略和检索层次。

学AI大模型的正确顺序,千万不要搞错了

🤔2026年AI风口已来!各行各业的AI渗透肉眼可见,超多公司要么转型做AI相关产品,要么高薪挖AI技术人才,机遇直接摆在眼前!

有往AI方向发展,或者本身有后端编程基础的朋友,直接冲AI大模型应用开发转岗超合适!

就算暂时不打算转岗,了解大模型、RAG、Prompt、Agent这些热门概念,能上手做简单项目,也绝对是求职加分王🔋

📝给大家整理了超全最新的AI大模型应用开发学习清单和资料,手把手帮你快速入门!👇👇

学习路线:

✅大模型基础认知—大模型核心原理、发展历程、主流模型(GPT、文心一言等)特点解析
✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑
✅开发基础能力—Python进阶、API接口调用、大模型开发框架(LangChain等)实操
✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用
✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代
✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经

以上6大模块,看似清晰好上手,实则每个部分都有扎实的核心内容需要吃透!

我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询