Skip to content

生产故障直接千万级数据更新

1、模拟数据脚本

mysql
CREATE TABLE `emp` (
  `id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `emp_no` mediumint unsigned NOT NULL DEFAULT '0' COMMENT '员工编号',
  `emp_name` varchar(20) NOT NULL DEFAULT '' COMMENT '员工名称',
  `job` varchar(9) NOT NULL DEFAULT '' COMMENT '工作',
  `mgr` mediumint unsigned NOT NULL DEFAULT '0' COMMENT '上级编号',
  `hire_date` date NOT NULL COMMENT '入职时间',
  `sal` decimal(7,2) NOT NULL COMMENT '薪水',
  `comm` decimal(7,2) NOT NULL COMMENT '红利',
  `dept_no` mediumint unsigned NOT NULL DEFAULT '0' COMMENT '部门编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=gbk COMMENT='员工表';


set global log_bin_trust_function_creators=1; 

DELIMITER $$
CREATE FUNCTION rand_string(n INT) RETURNS VARCHAR(255) 
BEGIN 
DECLARE chars_str VARCHAR(100) DEFAULT 'abcdefghijklmnopqrstuvwxyzABCDEFJHIJKLMNOPQRSTUVWXYZ'; 
DECLARE return_str VARCHAR(255) DEFAULT ''; 
DECLARE i INT DEFAULT 0; 
WHILE i < n DO 
SET return_str =CONCAT(return_str,SUBSTRING(chars_str,FLOOR(1+RAND()*52),1)); 
SET i = i + 1; 
END WHILE; 
RETURN return_str; 
END $$ 

DELIMITER $$ 
CREATE FUNCTION rand_num( ) 
RETURNS INT(5) 
BEGIN 
DECLARE i INT DEFAULT 0; 
SET i = FLOOR(100+RAND()*10); 
RETURN i; 
END $$ 



DELIMITER $$
CREATE PROCEDURE insert_emp(IN START INT(10),IN max_num INT(10)) 
BEGIN 
DECLARE i INT DEFAULT 0; 
#set autocommit =0 把autocommit设置成0 
SET autocommit = 0; 
REPEAT 
SET i = i + 1; 
INSERT INTO emp (emp_no, emp_name ,job ,mgr ,hire_date ,sal ,comm ,dept_no ) VALUES ((START+i) ,rand_string(6),'SALESMAN',0001,CURDATE(),2000,400,rand_num());
UNTIL i = max_num 
END REPEAT; 
COMMIT; 
END $$ 


select count(1) from emp;

DELIMITER ;
CALL insert_emp(1,10000000);

2、执行

mysql
# 第一个会话执行
update emp set sal = sal + 100;

# 第二个会话执行
update  emp set sal = sal + 100 where id = 1;
  • 执行时间长
  • 资源消耗大
  • 会造成锁表
  • 主从同步延迟 全表更新会一次性在主库上产生大量 binlog 日志,而主从同步是依靠 binlog 来完成的,此时就会导致主从同步延迟特别严重

image-20240912225156540

3、解决方案

java
package com.xx;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: xueqimiao
 * @Date: 2024/9/11 20:48
 */
public class Update1000W {

    public static void main(String[] args) throws SQLException, ClassNotFoundException {

        long startTime = System.nanoTime();
        myBatchUpdate();
        long endTime = System.nanoTime();
        System.out.println("----costTime: " + (endTime - startTime) / 1000000000.0 + " 秒");

    }


    private static void myBatchUpdate() throws ClassNotFoundException, SQLException {
        long minPKId = 0;
        Class.forName("com.mysql.cj.jdbc.Driver");
        String jdbcUrl = "jdbc:mysql://localhost:3306/xx_db2024?useUnicode=true&characterEncoding=utf8" +
                "&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
        String username = "root";
        String password = "mac_root";

        Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
        PreparedStatement preparedStatement = null;
        PreparedStatement preparedStatement2 = null;
        PreparedStatement preparedStatement3 = null;

        try {
            //2 创建PreparedStatement对象,并设置为批处理模式
            preparedStatement = connection.prepareStatement("SELECT MIN(id) as id FROM emp");
            //3
            ResultSet resultSet = preparedStatement.executeQuery();
            if (resultSet.next()) {
                minPKId = resultSet.getInt("id");
                //System.out.println("emp表最小主键id:" + minPKId);
            }

            List<Integer> idList = new ArrayList<>();
            String sql2 = "SELECT id FROM emp WHERE id>=" + minPKId + " ORDER BY id LIMIT 1000";
            String sql3 = "UPDATE emp SET sal=sal+1 WHERE id IN (";

            //循环批量更新数据
            while (true) {
                //查询出1000条需要更新的记录的id,注意查询条件(id>=#{minPKId}),这个查询会走主键索引,查询会特别快,毫秒级别
                //System.out.println(sql2);
                preparedStatement2 = connection.prepareStatement(sql2);
                ResultSet resultSet2 = preparedStatement2.executeQuery();

                if (resultSet2.isBeforeFirst()) {
                    while (resultSet2.next()) {
                        idList.add(resultSet2.getInt("id"));
                        sql3 = sql3 + resultSet2.getString("id") + ",";
                    }
                    sql3 = sql3.substring(0, (sql3.length() - 1));//去掉最后一个,号
                    sql3 = sql3 + ")";//尾部最后加上括号)
                    //System.out.println(sql3);
                    preparedStatement3 = connection.prepareStatement(sql3);
                    preparedStatement3.executeUpdate();

                    //minId = idList最后一条记录的id
                    minPKId = idList.get(idList.size() - 1);
                    minPKId = minPKId + 1;
                    //System.out.println("*****minPKId: "+minPKId);
                    sql2 = "SELECT id FROM emp WHERE id>=" + minPKId + " ORDER BY id LIMIT 1000";
                    sql3 = "UPDATE emp SET sal=sal+1 WHERE id IN (";
                } else {
                    break;
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
            // 发生异常时回滚事务
            if (connection != null) {
                connection.rollback();
            }
        } finally {
            // 关闭PreparedStatement和Connection
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}