本文档面向已经熟练使用 TDengine 的开发者,聚焦 C/C++ 连接器的高阶用法与工程化落地建议:如何选择连接方式、如何把写入/查询/订阅跑到“更稳、更快、更可运维”,以及一些常见的坑与规避方式。
前置条件
- 已熟悉 C/C++ Connector 的基础 API
连接策略与部署形态
WebSocket vs 原生连接:优先级怎么定
C/C++ 连接器支持 WebSocket 连接与原生连接,两者的关键差异在基础文档中已有说明。
- 优先推荐 WebSocket:更容易跨版本/跨环境部署,且性能接近原生连接。
- 需要“完整能力/强版本一致”时选原生:例如需要使用 WebSocket 暂不支持的少数接口(基础文档中已列出),或你希望严格版本匹配。
:::warning
无论选择哪种方式,都要牢记:taos_options(TSDB_OPTION_DRIVER, "websocket")必须在程序开始时调用,且只能调用一次;设置后对进程全局生效。
:::
WebSocket 多端点负载均衡与故障切换(推荐)
当你使用 taosAdapter 提供的 WebSocket 能力时,可以通过taos_connect_with()+OPTIONS的adapterList配置多个端点,实现连接级别的负载均衡与故障切换。
#include"taos.h"staticTAOS*connect_ws_with_adapters(void){OPTIONS options={0};// 多端点:host:port 用英文逗号分隔taos_set_option(&options,"adapterList","adapter1:6041,adapter2:6041,adapter3:6041");// 鉴权与默认 DBtaos_set_option(&options,"user","root");taos_set_option(&options,"pass","taosdata");taos_set_option(&options,"db","power");// 连接重试与退避(WebSocket 专有配置)taos_set_option(&options,"connRetries","5");taos_set_option(&options,"retryBackoffMs","200");taos_set_option(&options,"retryBackoffMaxMs","2000");// 可选:启用压缩(按需开启)taos_set_option(&options,"compression","1");returntaos_connect_with(&options);}:::note
adapterList的优先级高于ip。- WebSocket 默认端口为 6041(基础文档的
OPTIONS参数表中已说明)。
:::
WebSocket TLS 与证书校验
如果你在公网/跨机房环境使用 WebSocket,建议显式评估 TLS 相关配置(wsTlsMode、wsTlsVersion、wsTlsCa)。
wsTlsMode支持从“禁用/不校验证书/校验证书但不校验主机名/校验证书且校验主机名”等多档;wsTlsCa可传 CA 文件路径或 PEM 内容,用于验证服务端证书。
具体参数与取值含义请以基础文档的OPTIONS参数表为准。
连接可观测性:标记 userIp/userApp/connectorInfo
在多服务、多租户或需要审计/排障的场景,建议为连接设置识别信息:
taos_options_connection(..., TSDB_OPTION_CONNECTION_USER_IP, ...)taos_options_connection(..., TSDB_OPTION_CONNECTION_USER_APP, ...)taos_options_connection(..., TSDB_OPTION_CONNECTION_CONNECTOR_INFO, ...)
这样在服务端日志与审计链路里更容易定位请求来源。
线程模型与连接池(稳定性优先)
基础文档已经给出关键提示:
- 推荐每线程独立连接,或基于线程的连接池
- 不推荐在多个线程间共享同一个
TAOS*,以避免连接内状态(例如USE)互相干扰 - 同步 API 执行过程中,避免使用类似
pthread_cancel的强制终止方式,以免引发死锁等不可预期问题
工程建议:
- 把
TAOS*的生命周期绑定到线程(线程本地存储或显式池化)。 - 需要跨线程传递任务时,只传 SQL/参数,不传
TAOS*,在工作线程里拿自己的连接执行。 - 连接池要能“丢弃坏连接”:一旦出现不可恢复错误(例如连接断开且重试失败),直接销毁并重建连接。
错误处理与资源管理:最常见的坑
统一的错误检查范式
taos_query():不能用返回值是否为NULL判断成功与否,必须检查taos_errno(res)taos_schemaless_insert*():某些情况下可能返回NULL,仍可通过taos_errno(NULL)获取错误码tmq_consumer_poll():返回NULL既可能是“超时无数据”,也可能是“出错”,应结合taos_errno(NULL)判断
TAOS_RES*res=taos_query(taos,"select server_version()");int32_tcode=taos_errno(res);if(code!=0){// res 可能为 NULL,也可能非 NULLfprintf(stderr,"TDengine error=%d, msg=%s\n",code,taos_errstr(res));}taos_free_result(res);资源释放顺序与泄漏防线
- 查询/写入返回的
TAOS_RES*:用完必须taos_free_result() - 连接
TAOS*:不再使用时taos_close() - 进程退出:可调用
taos_cleanup()清理运行环境
:::warningtaos_free_result()之后不要再使用该结果集调用taos_fetch_row()等接口,否则可能崩溃。
:::
C++ RAII:把“必须释放”变成“自动释放”
在 C++ 工程里,建议用 RAII 封装TAOS*/TAOS_RES*的释放逻辑,减少异常路径/早返回导致的泄漏。
#include<memory>#include"taos.h"structTaosConnDeleter{voidoperator()(TAOS*p)const{if(p)taos_close(p);}};structTaosResDeleter{voidoperator()(TAOS_RES*p)const{if(p)taos_free_result(p);}};usingTaosConnPtr=std::unique_ptr<TAOS,TaosConnDeleter>;usingTaosResPtr=std::unique_ptr<TAOS_RES,TaosResDeleter>;staticTaosResPtrquery(TAOS*taos,constchar*sql){returnTaosResPtr(taos_query(taos,sql));}高性能查询:大结果集与批量拉取
优先使用taos_fetch_block()做批量拉取
当结果集很大时,逐行taos_fetch_row()的函数调用开销会更明显。可以优先使用taos_fetch_block()批量拉取,再在应用侧遍历行。
TAOS_RES*res=taos_query(taos,"select * from meters limit 10000");if(taos_errno(res)!=0){fprintf(stderr,"query failed: %s\n",taos_errstr(res));taos_free_result(res);return;}intnum_fields=taos_num_fields(res);TAOS_FIELD*fields=taos_fetch_fields(res);TAOS_ROW rows=NULL;intn=0;charbuf[1024];while((n=taos_fetch_block(res,&rows))>0){for(inti=0;i<n;i++){// rows[i] 是一行,字段类型由 fields 描述taos_print_row(buf,rows[i],fields,num_fields);// 业务处理:写日志/解析字段/转对象}}taos_free_result(res);关注时间戳精度
- 结果集时间戳精度可用
taos_result_precision(res)判断(毫秒/微秒/纳秒) - 在做时间序列对齐、跨系统写回或序列化时,建议把精度显式带上,避免“看似正确但偏了 1000 倍”的问题
高性能写入:SQL vs STMT2 vs Schemaless
选型建议
- SQL 直接写入:最简单;适合吞吐不敏感、写入规模不大或调试场景。
- STMT2(参数绑定):通常是高吞吐写入的首选,避免重复 SQL 解析开销,并支持批量绑定。
- Schemaless:适合 schema 经常变化、或者希望“写入即建表/自动演进”的场景;也适合做多源协议接入(Line Protocol/Telnet/JSON)。
STMT2 的关键复用点:Prepare 一次,Bind/Exec 多次
基础文档已经给出 STMT2 的典型流程:
taos_stmt2_init()taos_stmt2_prepare()- 多次
taos_stmt2_bind_param()+taos_stmt2_exec() taos_stmt2_close()
工程上最重要的是:
- 同一条 SQL,尽量复用 prepare 结果(只换绑定数据,避免反复解析)。
- 批次大小要“按行大小”调优:每行很大时批次不要太大;每行很小可适当加大批次。
- 出错后建议重建 stmt2 上下文:基础文档已明确“不建议继续在当前环境上下文下工作”,可通过
taos_stmt2_error()查看原因。
STMT2 高效写入模式与异步执行
taos_stmt2_init()的TAOS_STMT2_OPTION支持:
- 高效写入模式:将
singleStbInsert与singleTableBindOnce设为true - 异步执行:设置回调函数
asyncExecFn与回调参数userdata
:::warning
启用异步执行时,务必确保绑定数据涉及的内存(表名、tags、列值缓冲区等)在回调触发前仍然有效;不要把栈上临时变量地址直接作为绑定数据长期使用。
:::
Schemaless:用 raw/ttl/reqid 扩展接口提升工程可控性
在 Schemaless 进阶使用中,常见诉求包括:
- 批量更大、更省拷贝:使用
_raw版本,按lines + len传递整块缓冲区 - 链路追踪:使用
_with_reqid版本带上reqid - 自动建表 TTL:使用
_ttl版本控制自动建表的生存时间
#include"taos.h"#include<stdint.h>#include<stdio.h>staticvoidschemaless_insert_raw_demo(TAOS*taos){// 示例:多行 Line Protocol 拼接成一段缓冲区// 注意:这里用 \n 分隔多行;真实工程里建议用更高效的缓冲区构造方式charlines[]="meters,location=beijing groupid=1 current=10.1,voltage=220i32,phase=0.33 1700000000000\n""meters,location=beijing groupid=1 current=10.2,voltage=221i32,phase=0.34 1700000001000\n";int32_ttotalRows=0;int64_treqid=123456789;TAOS_RES*res=taos_schemaless_insert_raw_with_reqid(taos,lines,(int)sizeof(lines)-1,&totalRows,TSDB_SML_LINE_PROTOCOL,TSDB_SML_TIMESTAMP_MILLI_SECONDS,reqid);if(taos_errno(res)!=0){fprintf(stderr,"schemaless failed: %s\n",taos_errstr(res));}else{printf("schemaless ok, totalRows=%d\n",totalRows);}taos_free_result(res);}TMQ(数据订阅)进阶:消费语义、提交与消息类型
关键约束与建议
tmq_consumer_poll():每个消费者只能单线程调用(基础文档已说明)。- “处理-提交”顺序决定语义:
- 至少一次(at-least-once):处理成功后再
tmq_commit_sync() - 至多一次(at-most-once):先提交再处理(可能丢数据,谨慎)
- 至少一次(at-least-once):处理成功后再
- 订阅延迟较大或需要消费离线数据时,务必确保数据库的 WAL 保留策略满足需求(例如 WAL 的额外保留时长/大小相关配置)。
识别消息类型:数据/元数据/自动建表
tmq_get_res_type(msg)会返回TMQ_RES_DATA/TMQ_RES_TABLE_META/TMQ_RES_METADATA等类型。
- 只关心数据行:仅处理
TMQ_RES_DATA - 需要感知表结构变化/自动建表:同时处理元数据类型
推荐消费循环(带手动提交)
下面示例基于快速上手里的配置项(group.id、td.connect.user、td.connect.pass、auto.offset.reset),增加了:
- 消息类型判断
- 处理后提交 offset
- 释放消息结果集
#include"taos.h"#include<stdio.h>staticvoidconsume_loop(void){tmq_conf_t*conf=tmq_conf_new();tmq_conf_set(conf,"group.id","group1");tmq_conf_set(conf,"td.connect.user","root");tmq_conf_set(conf,"td.connect.pass","taosdata");tmq_conf_set(conf,"auto.offset.reset","earliest");charerrstr[512]={0};tmq_t*tmq=tmq_consumer_new(conf,errstr,(int32_t)sizeof(errstr));tmq_conf_destroy(conf);if(!tmq){fprintf(stderr,"tmq_consumer_new failed: %s\n",errstr);return;}tmq_list_t*topics=tmq_list_new();tmq_list_append(topics,"topic_meters");if(tmq_subscribe(tmq,topics)!=0){fprintf(stderr,"tmq_subscribe failed\n");tmq_list_destroy(topics);tmq_consumer_close(tmq);return;}tmq_list_destroy(topics);for(;;){TAOS_RES*msg=tmq_consumer_poll(tmq,1000);if(!msg){// 可能是超时无数据,也可能出错:按需结合 taos_errno(NULL) 做监控/告警continue;}tmq_res_ttp=tmq_get_res_type(msg);if(tp==TMQ_RES_DATA){// 消费数据:msg 的读取方式与 taos_query 返回的结果集一致TAOS_ROW row;while((row=taos_fetch_row(msg))){// 业务处理...}// 处理成功后提交(至少一次语义)int32_trc=tmq_commit_sync(tmq,msg);if(rc!=0){fprintf(stderr,"tmq_commit_sync failed: %s\n",tmq_err2str(rc));}}else{// 元数据/自动建表等类型:按需处理或忽略// 你也可以通过 tmq_get_table_name/tmq_get_topic_name 等接口做路由}taos_free_result(msg);}// 实际工程里需要有退出条件// tmq_unsubscribe(tmq);// tmq_consumer_close(tmq);}:::warning
上例中省略了退出条件与信号处理;实际工程建议引入“可中断的循环 + 有序退出”(先 unsubscribe,再 close)。
:::
关于TDengine
TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。