背景:项目需迁库,使用navicat的工具数据传输或者转储去执行都会报错,因为数据量超大,然后尝试通过代码实现,先在指定连接下建库,然后从源库转储表结构出来,到目标库执行。然后在代码去实现同步所有这些表的数据,能力有限,将就看看。
整体的逻辑大致是:
(1)先搭目标库,转储源库表结构,去目标库执行,使目标库与源库表结构一致;
(2)拿到源库所有表名(下有sql),到时候代码就跑这些表名的表同步数据;
(3)代码里同步模式isTotalOrAdd,如果是全量total的时候跑,那么他一张张表循环跑的时候每次都会先truncate table这张表然后再去新增;如果是add增量的话,它会去拿这个表的主键,然后查这个表大于当前最大主键的数据去插入(注意,这得保证每个表都得有主键才行,我当时跑很多表没主键去AI好像有个办法可以直接查到库里所有没主键的表给他加上来着但那个没存找不见了);
(4)存在一张表数据量超大溢出报错,所以后有限制分页每次都查1000条。
(5)关于报错日志,因为一次性跑所有表控制台日志会被刷跑,所以收集了所有报错信息把它存到了本地路径文件里,另外,某个表同步报错了不会中断它会记录日志接着跑下一张表;
(6)有些表其实跑的时候会报错的,比如视图、有外键的表之类的,或者一些大表也可以直接通过navicat工具去数据传输的,可以让他代码跑大部分表,自己手动跑少部分的大表节省时间(我的代码自己跑的话1小时应该有两百万条大概估的,自己用的时候试试先跑一个表看看效果(注意备份)成功再跑所有)。
mysql查源库下所有表名(会带上视图),复制出来在编辑器直接替换成逗号间隔的字符串(\r\n--->,),这个是要同步的表名,自己手动跑的记得摘出去:
-- 查询INFORMATION_SCHEMA系统表 SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'data_new' ORDER BY TABLE_NAME;controller代码:
/** * 同步数据Controller */ @RestController @RequestMapping("/data/sync") public class DataSyncController extends BaseController { @Autowired private DataSyncService dataSyncService; /**【这个就在最初跑 运行时 注意切换数据源、开启从库开关enabled(另数据库连接加参:在数据源配置中添加允许零日期的参数配置 &zeroDateTimeBehavior=convertToNull)】 * @return */ @GetMapping("/dataSync") public RequestResult dataSync() { return RequestResult.success(dataSyncService.dataSync()); } }service类:
public interface DataSyncService { int dataSync(); }实现类:
@Service public class DataSyncServiceImpl implements DataSyncService { @Autowired private TbDataMapper tbDataMapper; @Autowired private TbTsDataMapper tsDataMapper; //这个是库名 使用时有叫不同的库名的,他原生的写法都是库名.表名用的表,导致库名不一致得改所有,为方便所以直接配成能通用的 @Value("${spring.custom.datasource.data_schema}") private String DATA_SCHEMA; @Override public int dataSync() { //主库(实际使用时存在一些表报错的情况,比如视图、存在外键的表等,这些还是得手动推(就是navicat里选中库-->工具--->数据传输去同步)走不了代码) String tableNames = "test1_table,test2_table"; // 同步模式:total(全量) 或 add(增量) String isTotalOrAdd = "total"; // String isTotalOrAdd = "add"; List<SyncErrorRecord> errorList = new ArrayList<>(); List<String> tableNamesList = Arrays.asList(tableNames.split(",")); int totalCount = 0; for (String tableName : tableNamesList) { try { int tableCount = syncSingleTableMaster(isTotalOrAdd, tableName, errorList); totalCount += tableCount; } catch (Exception e) { SyncErrorRecord errorRecord = new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage(e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType("FULL_SYNC_ERROR"); errorList.add(errorRecord); System.err.println("表 " + tableName + " 同步时发生错误: " + e.getMessage()); e.printStackTrace(); } } System.out.println("总共同步了 " + totalCount + " 条数据"); if (CollectionUtils.isNotEmpty(errorList)) { System.out.println("《~~~~~~~~~~~~==============主库同步过程中发生错误的表=============~~~~~~~~~~~~》:"); for (SyncErrorRecord error : errorList) { System.out.println("表名: " + error.getTableName() + ", 错误: " + error.getErrorMessage()); } System.out.println("《############==============主库同步过程中发生错误的表=============############》"); // 保存错误记录 这里会把日志文件保存到文件里最后跑完看哪些表报错了再手动推就行 saveErrorRecords(errorList); } return totalCount; } /** * 数据同步处理 * @param isTotalOrAdd * @param tableName * @param errorList * @return */ private int syncSingleTableMaster(String isTotalOrAdd, String tableName, List<SyncErrorRecord> errorList) { // 如果是全量同步,清空目标表数据 if ("total".equals(isTotalOrAdd)) { tsDataMapper.truncateTableNewdb(tableName); } int totalCount = 0; Object maxPrimaryKeyValue = null; String primaryKeyColumn = null; try { // 获取表的主键字段名 primaryKeyColumn = tsDataMapper.getPrimaryKeyColumn(DATA_SCHEMA, tableName); // 如果是增量同步,查询目标表中的最大主键值 if ("add".equals(isTotalOrAdd)) { maxPrimaryKeyValue = tsDataMapper.getMaxPrimaryKeyValueCmg(tableName, primaryKeyColumn); System.out.println("表 " + tableName + " 主键字段: " + primaryKeyColumn + ", 当前最大值: " + maxPrimaryKeyValue); } } catch (Exception e) { SyncErrorRecord errorRecord = new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage("查询目标表主键最大值失败: " + e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType("PRIMARY_KEY_QUERY_ERROR"); errorList.add(errorRecord); return totalCount; } int offset = 0; // 每次处理1000条记录 int limit = 1000; boolean hasMoreData = true; while (hasMoreData) { try { // 分页查询数据,根据主键值进行增量查询 List<Map<String, Object>> dataList = tbDataMapper.getMasterDataListNewdbPcByPageWithPrimaryKeyCmg( tableName, offset, limit, primaryKeyColumn, maxPrimaryKeyValue); if (CollectionUtils.isNotEmpty(dataList)) { // 获取所有列名 Set<String> allColumnNames = getAllColumnNames(dataList); // 标准化数据结构 List<Map<String, Object>> standardizedDataList = standardizeData(dataList, allColumnNames); // 分批插入 int count = 0; for (int i = 0; i < standardizedDataList.size(); i += 500) { List<Map<String, Object>> batch = standardizedDataList.subList(i, Math.min(i + 500, standardizedDataList.size())); Set<String> batchColumnNames = getAllColumnNames(batch); List<Map<String, Object>> finalBatch = standardizeData(batch, batchColumnNames); count += tsDataMapper.insertBatchNewdbNew(tableName, finalBatch); } totalCount += count; offset += limit; // 如果返回的数据少于limit,说明已经没有更多数据 if (dataList.size() < limit) { hasMoreData = false; } } else { // 没有查询到数据,说明已同步完所有新增数据 hasMoreData = false; } } catch (Exception e) { SyncErrorRecord errorRecord = new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage("分页同步失败: " + e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType("PAGE_SYNC_ERROR"); errorList.add(errorRecord); break; // 出错则跳出循环 } } System.out.println("表 " + tableName + " 同步了 " + totalCount + " 条数据"); return totalCount; } /** * 将错误记录保存到文件或专门的错误表中 * * 收集错误日志示例: * ~~~~~~~~~~~~==============主库同步过程中发生错误的表=============~~~~~~~~~~~~》: * 表名: test_table, 错误: 分页同步失败: * ### Error querying database. Cause: java.sql.SQLSyntaxErrorException: SELECT command denied to user 'data_new'@'1.1.1.1' for table 'test_table' * ### The error may exist in file [D:\app\target\classes\mybatis\data\DataMapper.xml] * ### The error may involve defaultParameterMap * ### The error occurred while setting parameters * ### SQL: SELECT * FROM data_new.`test_table` ORDER BY `LOG_NR_` ASC LIMIT ?, ? * ### Cause: java.sql.SQLSyntaxErrorException: SELECT command denied to user 'data_new'@'1.1.1.1' for table 'test_table' * ; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: SELECT command denied to user 'data_new'@'1.1.1.1' for table 'test_table' * 《############==============主库同步过程中发生错误的表=============############》 * 错误记录已保存到文件: C:\Users\Administrator\AppData\Local\Temp\\sync_errors_1766713290763.log * * @param errorList */ private void saveErrorRecords(List<SyncErrorRecord> errorList) { // 将错误记录保存到文件或专门的错误表中 try { // // 方案1: 指定绝对路径 // String errorLogPath = "/path/to/logs/sync_errors_" + System.currentTimeMillis() + ".log"; // // // 方案2: 使用相对路径(保存到项目logs目录下) // String errorLogPath = "logs/sync_errors_" + System.currentTimeMillis() + ".log"; // 方案3: 使用系统临时目录 String tempDir = System.getProperty("java.io.tmpdir"); String errorLogPath = tempDir + File.separator + "sync_errors_" + System.currentTimeMillis() + ".log"; // 保存到文件示例 // String errorLog = "sync_errors_" + System.currentTimeMillis() + ".log"; try (FileWriter writer = new FileWriter(errorLogPath)) { for (SyncErrorRecord error : errorList) { writer.write(String.format("表名: %s, 错误类型: %s, 错误信息: %s, 时间: %s%n", error.getTableName(), error.getErrorType(), error.getErrorMessage(), error.getErrorTime())); } } System.out.println("错误记录已保存到文件: " + errorLogPath); } catch (IOException e) { System.err.println("保存错误记录失败: " + e.getMessage()); } } /** * 获取所有记录中的列名集合 */ private Set<String> getAllColumnNames(List<Map<String, Object>> dataList) { Set<String> allColumnNames = new LinkedHashSet<>(); for (Map<String, Object> row : dataList) { allColumnNames.addAll(row.keySet()); } return allColumnNames; } /** * 标准化数据,确保所有行具有相同的列结构 */ private List<Map<String, Object>> standardizeData(List<Map<String, Object>> dataList, Set<String> allColumnNames) { List<Map<String, Object>> standardizedList = new ArrayList<>(); for (Map<String, Object> originalRow : dataList) { Map<String, Object> standardizedRow = new LinkedHashMap<>(); // 确保所有列都存在,缺失的列设为null for (String columnName : allColumnNames) { standardizedRow.put(columnName, originalRow.get(columnName)); } standardizedList.add(standardizedRow); } return standardizedList; }错误记录实体类:
/** * 错误记录实体类 */ @Data public class SyncErrorRecord { private String tableName; private String errorMessage; private Date errorTime; // FULL_SYNC_ERROR, INCREMENTAL_SYNC_ERROR private String errorType; }mapper类:
/** * 目标库 */ @Repository public interface TbTsDataMapper { @DataSource(DataSourceType.NEWDB) void truncateTableNewdb(@Param("tableName") String tableName); @DataSource(DataSourceType.NEWDB) String getPrimaryKeyColumn(@Param("dataSchema") String dataSchema, @Param("tableName") String tableName); @DataSource(DataSourceType.NEWDB) Object getMaxPrimaryKeyValueCmg(@Param("tableName") String tableName, @Param("primaryKeyColumn") String primaryKeyColumn); @DataSource(DataSourceType.NEWDB) int insertBatchNewdbNew(@Param("tableName") String tableName, @Param("dataList") List<Map<String, Object>> dataList); } /** * 源库 */ @Repository public interface TbDataMapper { @DataSource(DataSourceType.MASTER) List<Map<String, Object>> getMasterDataListNewdbPcByPageWithPrimaryKeyCmg( @Param("tableName") String tableName, @Param("offset") int offset, @Param("limit") int limit, @Param("primaryKeyColumn") String primaryKeyColumn, @Param("maxPrimaryKeyValue") Object maxPrimaryKeyValue); }目标库的TbTsDataMapper.xml方法:
<update id="truncateTableNewdb"> TRUNCATE TABLE ${data_schema}.`${tableName}` </update> <select id="getPrimaryKeyColumn" resultType="java.lang.String"> SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = #{dataSchema} AND TABLE_NAME = #{tableName} AND COLUMN_KEY = 'PRI' LIMIT 1 </select> <select id="getMaxPrimaryKeyValueCmg" resultType="java.lang.Object"> SELECT MAX(`${primaryKeyColumn}`) FROM ${data_schema}.`${tableName}` </select> <insert id="insertBatchNewdbNew"> INSERT INTO ${data_schema}.`${tableName}` <trim prefix="(" suffix=")" suffixOverrides=","> <foreach collection="dataList[0].keySet()" item="key"> <if test="key != null and key != ''"> `${key}`, </if> </foreach> </trim> VALUES <foreach collection="dataList" item="data" separator=","> <trim prefix="(" suffix=")" suffixOverrides=","> <foreach collection="dataList[0].keySet()" item="key"> <if test="key != null and key != ''"> <choose> <when test="data[key] == null"> NULL, </when> <otherwise> #{data.${key}}, </otherwise> </choose> </if> </foreach> </trim> </foreach> </insert>源库的TbDataMapper.xml方法:
<select id="getMasterDataListNewdbPcByPageWithPrimaryKeyCmg" resultType="java.util.Map"> SELECT * FROM `${tableName}` <where> <if test="maxPrimaryKeyValue != null"> `${primaryKeyColumn}` > #{maxPrimaryKeyValue} </if> </where> ORDER BY `${primaryKeyColumn}` ASC LIMIT #{offset}, #{limit} </select>部分application-test.yml内容:
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.cj.jdbc.Driver druid: # 主库数据源 master: url: jdbc:mysql://x.x.x.x:xxxx/data_old?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: masterusername password: masterpassword # 从库数据源 slave: # 从数据源开关/默认关闭 enabled: true url: jdbc:mysql://xx.xx.xx.xx:xxxx/test?useUnicode=true&characterEncoding=utf8 username: slaveusername password: slavepassword driverClassName: com.mysql.jdbc.Driver # 新库数据源 newdb: # 从数据源开关/默认关闭 enabled: true url: jdbc:mysql://xxx.xxx.xxx.xxx:xxx/data_new?useUnicode=true&characterEncoding=utf8 username: newdbusername password: newdbpassword driverClassName: com.mysql.jdbc.Driver # 初始连接数 initialSize: 5 # 最小连接池数量 minIdle: 10 # 最大连接池数量 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最大生存的时间,单位是毫秒 maxEvictableIdleTimeMillis: 900000 # 配置检测连接是否有效 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false webStatFilter: enabled: true statViewServlet: enabled: true # 设置白名单,不填则允许所有访问 allow: url-pattern: /druid/* # 控制台管理用户名和密码 login-username: login-password: filter: stat: enabled: true # 慢SQL记录 log-slow-sql: true slow-sql-millis: 1000 merge-sql: true wall: config: multi-statement-allow: true # 自定义数据库 schema 配置 custom: datasource: # 新库 data_schema: data_new如果要支持使用配置的库名还得在MyBatisConfig类加data_schema的配置:
/** * Mybatis支持*匹配扫描包 * * @author admin */ //@Configuration @Component public class MyBatisConfig { @Autowired private Environment env; static final String DEFAULT_RESOURCE_PATTERN = "**/*.class"; private final static String COMMA = ","; /** * 20250804加 处理正式、测试 数据库名不一致问题 */ @Value("${spring.custom.datasource.data_schema}") private String DATA_SCHEMA; @Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage"); String mapperLocations = env.getProperty("mybatis.mapperLocations"); String configLocation = env.getProperty("mybatis.configLocation"); typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage); VFS.addImplClass(SpringBootVFS.class); /* 【2025.10.09发现 mybatis和mybatis-plus同时使用时, 无法使用BaseMapper的公用方法,参考:https://blog.csdn.net/m0_67402125/article/details/123991284?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-123991284-blog-147252964.235^v43^pc_blog_bottom_relevance_base1&spm=1001.2101.3001.4242.1&utm_relevant_index=3】 解决方法:在配置数据源的地方,换一个类即可: */ // final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); MybatisSqlSessionFactoryBean sessionFactory=new MybatisSqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource); //20250804加 处理正式、测试 数据库名不一致问题 // 设置MyBatis属性 Properties properties = new Properties(); properties.setProperty("cm_mg_schema", CM_MG_SCHEMA); sessionFactory.setConfigurationProperties(properties); sessionFactory.setTypeAliasesPackage(typeAliasesPackage); sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation)); return sessionFactory.getObject(); } public static String setTypeAliasesPackage(String typeAliasesPackage) { ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver); List<String> allResult = new ArrayList<String>(); try { for (String aliasesPackage : typeAliasesPackage.split(COMMA)) { List<String> result = new ArrayList<String>(); aliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + ClassUtils.convertClassNameToResourcePath(aliasesPackage.trim()) + "/" + DEFAULT_RESOURCE_PATTERN; Resource[] resources = resolver.getResources(aliasesPackage); if (resources != null && resources.length > 0) { MetadataReader metadataReader = null; for (Resource resource : resources) { if (resource.isReadable()) { metadataReader = metadataReaderFactory.getMetadataReader(resource); try { result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName()); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } } if (result.size() > 0) { HashSet<String> hashResult = new HashSet<String>(result); allResult.addAll(hashResult); } } if (allResult.size() > 0) { typeAliasesPackage = String.join(",", (String[]) allResult.toArray(new String[0])); } else { throw new RuntimeException("mybatis typeAliasesPackage 路径扫描错误,参数typeAliasesPackage:" + typeAliasesPackage + "未找到任何包"); } } catch (IOException e) { e.printStackTrace(); } return typeAliasesPackage; } }