探究全量数据同步的奥秘


highlight: a11y-light theme: channing-cyan

数据准备

为了准备相对庞大的数据量,我从GitHub上找了一些公开的真实数据,是2019年到2023年WHO公开的Covid-19数据。

数据集地址

数据插入

拉取数据集,打开如下文件夹

我们可以看到很多的csv文件,这些都是全球各个地区的每日新增报告。

容器准备

我们使用Logstash对这批文件进行导入

docker启动Logstash

使用dockerfile进行docker启动,因为在启动之前我没需要对容器进行一些操作。

# 使用官方的 Logstash 镜像
FROM logstash:7.12.0

维护者信息

LABEL maintainer="xxxx@xxxx.com"

安装 logstash-output-jdbc 插件

RUN logstash-plugin install logstash-output-jdbc

创建目录以存放 JDBC 驱动程序

RUN mkdir -p /usr/share/logstash/jdbc

将 MySQL 连接器 JAR 包复制到容器中

COPY ./covid19/mysql-connector-java-8.0.28.jar /usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar

将配置文件和管道复制到容器中

COPY ./config/ /usr/share/logstash/config/
COPY ./pipeline/ /usr/share/logstash/pipeline/

暴露需要的端口

EXPOSE 5044 16060

设置 Logstash 命令

CMD [“logstash”, “-f”, “/usr/share/logstash/pipeline/xxxx.conf”]

注意要点:

  1. RUN logstash-plugin install logstash-output-jdbc 此命令会执行很久,耐心等待,可能与网络有关,可尝试切换快速的节点进行尝试。安装此插件目的是为了不编写代码,通过logstash直接操作数据库进行全量同步。
  2. xxxx.conf 请把conf文件名换成你的文件名。此文件内容是下面的logstash管道配置内容。
  3. 把上面拉取到的数据集放到如下文件夹 /home/mycontainers/mylogstash/covid19/
  4. 把mysql相关驱动的jar包下载到本地,然后放到服务器上,编写Dockerfile的时候根据你的宿主机路径进行实际更改。

请注意相对路径和当前目录,避免文件找不到报错。

具体配置请参考我之前的文章 告别服务器捞日志,开启自动化日志搜集之旅

logstash管道配置

input {
  file {
    path => "/home/mycontainers/mylogstash/covid19/*.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
csv {
separator => “,”
columns => [“FIPS”, “Admin2”, “Province_State”, “Country_Region”, “Last_Update”, “Lat”, “Long_”, “Confirmed”, “Deaths”, “Recovered”, “Active”, “Combined_Key”, “Incident_Rate”, “Case_Fatality_Ratio”]
}

mutate {
rename => {
“FIPS” => “fips”
“Admin2” => “admin2”
“Province_State” => “province_state”
“Country_Region” => “country_region”
“Last_Update” => “last_update”
“Lat” => “lat”
“Long_” => “long”
“Confirmed” => “confirmed”
“Deaths” => “deaths”
“Recovered” => “recovered”
“Active” => “active”
“Combined_Key” => “combined_key”
“Incident_Rate” => “incident_rate”
“Case_Fatality_Ratio” => “case_fatality_ratio”
}
}
}
output {
elasticsearch {
hosts => [“http://myes:9200”]
index => “covid19”
}
}

启动容器

确保配置和文件都正确后,在Dockerfile执行

docker build -t mylogstash:custom .

成功后我们继续执行如下命令

docker run --name mylogstashes
-p 5044:5044
-p 16060:16060
-itd --restart=always
-v /etc/localtime:/etc/localtime
-v /home/mycontainers/mylogstash/config:/usr/share/logstash/config
-v /home/mycontainers/mylogstash/pipeline:/usr/share/logstash/pipeline
-v /home/mycontainers/mylogstash/covid19:/home/mycontainers/mylogstash/covid19 --net mynetwork logstash:7.12.0

执行完成后,我们就可以去es查看数据了 使用Kibana的开发工具查询总数

image.png

好了,我们现在有400w条数据在es里面了。

数据同步

logstash

接下来我们要把es里面的数据全量同步到MySQL里面。

CREATE TABLE `covid19` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `combined_key` varchar(255) DEFAULT NULL,
  `country_region` varchar(255) DEFAULT NULL,
  `confirmed` int(11) DEFAULT NULL,
  `longitude` decimal(18,8) DEFAULT NULL,
  `source_path` varchar(255) DEFAULT NULL,
  `last_update_time` datetime DEFAULT NULL,
  `host_name` varchar(255) DEFAULT NULL,
  `province_state` varchar(255) DEFAULT NULL,
  `deaths` int(11) DEFAULT NULL,
  `latitude` decimal(18,8) DEFAULT NULL,
  `fips_code` varchar(255) DEFAULT NULL,
  `incident_rate` decimal(18,8) DEFAULT NULL,
  `case_fatality_ratio` decimal(18,8) DEFAULT NULL,
  `message_text` text,
  `admin2` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_combined_key_country_region_province_state` (`combined_key`,`country_region`,`province_state`),
  KEY `idx_last_update_time` (`last_update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

新建一个logstash容器,然后把管道配置改成如下配置

input {
  elasticsearch {
    hosts => ["http://myes:9200"]  # Elasticsearch 主机地址
    index => "covid19"          # 要同步的索引名称
    query => '{"size": 5000, "query": {"match_all": {}}}'  # 查询 JSON 格式
docinfo => true
    scroll => "1m"  # 添加滚动时间以确保只取一次
  }
}

filter {

可以在这里添加过滤器,对从 Elasticsearch 中读取的数据进行处理

mutate {
rename => {
“long” => “longitude”
“host” => “host_name”
}
}
}

output {
jdbc {
connection_string => “jdbc:mysql://192.168.31.150:3306/jycloud?user=root&password=123456&useUnicode=true&useSSL=false&characterEncoding=utf8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2B8” # MySQL 连接字符串
driver_jar_path => “/usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar” # MySQL 驱动程序 JAR 包路径
driver_class => “com.mysql.cj.jdbc.Driver” # MySQL 驱动程序类名
# statement => “INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)” # 插入语句,根据实际情况修改
# 以下是从 Elasticsearch 中获取字段值并插入到 MySQL 表中对应字段的配置
# 字段名需与 Elasticsearch 中的字段名对应
statement => [
“INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)”,
“combined_key”,
“country_region”,
“confirmed”,
“longitude”,
“source_path”,
“last_update”,
“host_name”,
“province_state”,
“deaths”,
“latitude”,
“fips_code”,
“incident_rate”,
“case_fatality_ratio”,
“message_text”,
“admin2”
]
}
}

执行docker复制文件命令替换掉的原本的管道配置文件

docker cp xxx.conf 容器id:/usr/share/logstash/pipeline/xxx.conf

确保/usr/share/logstash/pipeline文件夹里只有最新的conf文件。 然后重启容器。

docker restart 容器id/容器名称

然后查询数据库,会发现数据在逐渐递增。全量同步过程非常久,因为不是批量插入,而批量插入的情况下比逐条插入要快。笔者这次全量同步执行了大概2小时。

java

上面使用了逐条插入的方法,接下来我没使用一些批量插入进行一些对比。

配置

@Configuration
public class DatabaseConfig {
    @Value("${spring.datasource.url}")
    private String url;
@Value("${spring.datasource.username}")
private String username;

@Value("${spring.datasource.password}")
private String password;

@Value("${spring.datasource.driver-class-name}")
private String driverClassName;

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
    DruidDataSource dataSource =  new DruidDataSource();
    dataSource.setUrl(url);
    dataSource.setUsername(username);
    dataSource.setPassword(password);
    dataSource.setDriverClassName(driverClassName);
    return dataSource;
}

//配置SqlSessionFactory-常规写法
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSourceProxy)
        throws Exception {
    //出现invalid bound statement (not found) 使用此配置
    MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();

// SqlSessionFactoryBean sqlSessionFactoryBean =
// new SqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
bean.setMapperLocations
(new PathMatchingResourcePatternResolver().getResources(“classpath*:mapper/*.xml”));
bean.setTransactionFactory
(new SpringManagedTransactionFactory());
return bean.getObject();
}

}

核心代码

 @Test
    void SyncEsToMysql() {
        IndexCoordinates index = IndexCoordinates.of("covid19");
        int from = 0;
        List list = new ArrayList<>();
        Long currentDateTimeStart = System.currentTimeMillis();
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.matchAllQuery())
                .withPageable(PageRequest.of(0, BATCH_SIZE))
                .build();
        //首次查找
        SearchScrollHits hits = elasticsearchRestTemplate.searchScrollStart(5000, query, Covid.class, index);
        String scrollId = hits.getScrollId();
        try {
            while (hits != null && !hits.isEmpty()) {
                // 如果 resultList 的大小达到 batchSize,就插入数据库并清空 resultList
                hits.getSearchHits().forEach(hit -> list.add(hit.getContent()));
                if (list.size() >= BATCH_SIZE) {
                    //covid19Service.saveBatch(list);
                    list.clear();
                }
                //根据scrollId再次查询es
                hits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 5000, Covid.class, index);
                if (hits != null) {
                    scrollId = hits.getScrollId();
                }
                log.info("from :" + from);
                from += BATCH_SIZE;
            }
            if (!list.isEmpty()) {
                //covid19Service.saveBatch(list);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.info(e.getMessage());
        } finally {
            Long currentDateTimeEnd = System.currentTimeMillis();
            log.info("from :" + from);
            log.info("time:" + (currentDateTimeEnd - currentDateTimeStart));
        }
}

先把入库方法 covid19Service.saveBatch(list) 注释掉,跑一下看看需要多久

39a056750788ef71ae45767ec086e47.png

只查询的话需要519544毫秒,转换成分钟就是8.6分钟。

然后我再把入库方法 covid19Service.saveBatch(list) 打开试试看,执行代码,看控制台。

27780109067625ecf1d548be7035a2c.png

836151毫秒,折算成分钟就是13分钟。

查看MySQL和es,验证数据是否一致

大功告成,全量同步就说到这里了,增量同步我们下次再相遇。


这是一个从 https://juejin.cn/post/7369120920148181027 下的原始话题分离的讨论话题