highlight: a11y-light theme: channing-cyan
数据准备
为了准备相对庞大的数据量,我从GitHub上找了一些公开的真实数据,是2019年到2023年WHO公开的Covid-19数据。
数据插入
拉取数据集,打开如下文件夹
我们可以看到很多的csv文件,这些都是全球各个地区的每日新增报告。
容器准备
我们使用Logstash对这批文件进行导入
docker启动Logstash
使用dockerfile进行docker启动,因为在启动之前我没需要对容器进行一些操作。
# 使用官方的 Logstash 镜像 FROM logstash:7.12.0
维护者信息
LABEL maintainer="xxxx@xxxx.com"
安装 logstash-output-jdbc 插件
RUN logstash-plugin install logstash-output-jdbc
创建目录以存放 JDBC 驱动程序
RUN mkdir -p /usr/share/logstash/jdbc
将 MySQL 连接器 JAR 包复制到容器中
COPY ./covid19/mysql-connector-java-8.0.28.jar /usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar
将配置文件和管道复制到容器中
COPY ./config/ /usr/share/logstash/config/
COPY ./pipeline/ /usr/share/logstash/pipeline/暴露需要的端口
EXPOSE 5044 16060
设置 Logstash 命令
CMD [“logstash”, “-f”, “/usr/share/logstash/pipeline/xxxx.conf”]
注意要点:
- RUN logstash-plugin install logstash-output-jdbc 此命令会执行很久,耐心等待,可能与网络有关,可尝试切换快速的节点进行尝试。安装此插件目的是为了不编写代码,通过logstash直接操作数据库进行全量同步。
- xxxx.conf 请把conf文件名换成你的文件名。此文件内容是下面的logstash管道配置内容。
- 把上面拉取到的数据集放到如下文件夹 /home/mycontainers/mylogstash/covid19/。
- 把mysql相关驱动的jar包下载到本地,然后放到服务器上,编写Dockerfile的时候根据你的宿主机路径进行实际更改。
请注意相对路径和当前目录,避免文件找不到报错。
具体配置请参考我之前的文章 告别服务器捞日志,开启自动化日志搜集之旅
logstash管道配置
input { file { path => "/home/mycontainers/mylogstash/covid19/*.csv" start_position => "beginning" sincedb_path => "/dev/null" } }
filter {
csv {
separator => “,”
columns => [“FIPS”, “Admin2”, “Province_State”, “Country_Region”, “Last_Update”, “Lat”, “Long_”, “Confirmed”, “Deaths”, “Recovered”, “Active”, “Combined_Key”, “Incident_Rate”, “Case_Fatality_Ratio”]
}
mutate {
rename => {
“FIPS” => “fips”
“Admin2” => “admin2”
“Province_State” => “province_state”
“Country_Region” => “country_region”
“Last_Update” => “last_update”
“Lat” => “lat”
“Long_” => “long”
“Confirmed” => “confirmed”
“Deaths” => “deaths”
“Recovered” => “recovered”
“Active” => “active”
“Combined_Key” => “combined_key”
“Incident_Rate” => “incident_rate”
“Case_Fatality_Ratio” => “case_fatality_ratio”
}
}
}
output {
elasticsearch {
hosts => [“http://myes:9200”]
index => “covid19”
}
}
启动容器
确保配置和文件都正确后,在Dockerfile执行
docker build -t mylogstash:custom .
成功后我们继续执行如下命令
docker run --name mylogstashes
-p 5044:5044
-p 16060:16060
-itd --restart=always
-v /etc/localtime:/etc/localtime
-v /home/mycontainers/mylogstash/config:/usr/share/logstash/config
-v /home/mycontainers/mylogstash/pipeline:/usr/share/logstash/pipeline
-v /home/mycontainers/mylogstash/covid19:/home/mycontainers/mylogstash/covid19 --net mynetwork logstash:7.12.0
执行完成后,我们就可以去es查看数据了 使用Kibana的开发工具查询总数
好了,我们现在有400w条数据在es里面了。
数据同步
logstash
接下来我们要把es里面的数据全量同步到MySQL里面。
CREATE TABLE `covid19` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`combined_key` varchar(255) DEFAULT NULL,
`country_region` varchar(255) DEFAULT NULL,
`confirmed` int(11) DEFAULT NULL,
`longitude` decimal(18,8) DEFAULT NULL,
`source_path` varchar(255) DEFAULT NULL,
`last_update_time` datetime DEFAULT NULL,
`host_name` varchar(255) DEFAULT NULL,
`province_state` varchar(255) DEFAULT NULL,
`deaths` int(11) DEFAULT NULL,
`latitude` decimal(18,8) DEFAULT NULL,
`fips_code` varchar(255) DEFAULT NULL,
`incident_rate` decimal(18,8) DEFAULT NULL,
`case_fatality_ratio` decimal(18,8) DEFAULT NULL,
`message_text` text,
`admin2` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_combined_key_country_region_province_state` (`combined_key`,`country_region`,`province_state`),
KEY `idx_last_update_time` (`last_update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
新建一个logstash容器,然后把管道配置改成如下配置
input { elasticsearch { hosts => ["http://myes:9200"] # Elasticsearch 主机地址 index => "covid19" # 要同步的索引名称 query => '{"size": 5000, "query": {"match_all": {}}}' # 查询 JSON 格式 docinfo => true scroll => "1m" # 添加滚动时间以确保只取一次 } }
filter {
可以在这里添加过滤器,对从 Elasticsearch 中读取的数据进行处理
mutate {
rename => {
“long” => “longitude”
“host” => “host_name”
}
}
}output {
jdbc {
connection_string => “jdbc:mysql://192.168.31.150:3306/jycloud?user=root&password=123456&useUnicode=true&useSSL=false&characterEncoding=utf8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2B8” # MySQL 连接字符串
driver_jar_path => “/usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar” # MySQL 驱动程序 JAR 包路径
driver_class => “com.mysql.cj.jdbc.Driver” # MySQL 驱动程序类名
# statement => “INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)” # 插入语句,根据实际情况修改
# 以下是从 Elasticsearch 中获取字段值并插入到 MySQL 表中对应字段的配置
# 字段名需与 Elasticsearch 中的字段名对应
statement => [
“INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)”,
“combined_key”,
“country_region”,
“confirmed”,
“longitude”,
“source_path”,
“last_update”,
“host_name”,
“province_state”,
“deaths”,
“latitude”,
“fips_code”,
“incident_rate”,
“case_fatality_ratio”,
“message_text”,
“admin2”
]
}
}
执行docker复制文件命令替换掉的原本的管道配置文件
docker cp xxx.conf 容器id:/usr/share/logstash/pipeline/xxx.conf
确保/usr/share/logstash/pipeline文件夹里只有最新的conf文件。 然后重启容器。
docker restart 容器id/容器名称
然后查询数据库,会发现数据在逐渐递增。全量同步过程非常久,因为不是批量插入,而批量插入的情况下比逐条插入要快。笔者这次全量同步执行了大概2小时。
java
上面使用了逐条插入的方法,接下来我没使用一些批量插入进行一些对比。
配置
@Configuration public class DatabaseConfig { @Value("${spring.datasource.url}") private String url;
@Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; @Value("${spring.datasource.driver-class-name}") private String driverClassName; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); dataSource.setDriverClassName(driverClassName); return dataSource; } //配置SqlSessionFactory-常规写法 @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSourceProxy) throws Exception { //出现invalid bound statement (not found) 使用此配置 MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
// SqlSessionFactoryBean sqlSessionFactoryBean =
// new SqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
bean.setMapperLocations
(new PathMatchingResourcePatternResolver().getResources(“classpath*:mapper/*.xml”));
bean.setTransactionFactory
(new SpringManagedTransactionFactory());
return bean.getObject();
}
}
核心代码
@Test void SyncEsToMysql() { IndexCoordinates index = IndexCoordinates.of("covid19"); int from = 0; List list = new ArrayList<>(); Long currentDateTimeStart = System.currentTimeMillis(); NativeSearchQuery query = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.matchAllQuery()) .withPageable(PageRequest.of(0, BATCH_SIZE)) .build(); //首次查找 SearchScrollHits hits = elasticsearchRestTemplate.searchScrollStart(5000, query, Covid.class, index); String scrollId = hits.getScrollId(); try { while (hits != null && !hits.isEmpty()) { // 如果 resultList 的大小达到 batchSize,就插入数据库并清空 resultList hits.getSearchHits().forEach(hit -> list.add(hit.getContent())); if (list.size() >= BATCH_SIZE) { //covid19Service.saveBatch(list); list.clear(); } //根据scrollId再次查询es hits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 5000, Covid.class, index); if (hits != null) { scrollId = hits.getScrollId(); } log.info("from :" + from); from += BATCH_SIZE; } if (!list.isEmpty()) { //covid19Service.saveBatch(list); } } catch (Exception e) { e.printStackTrace(); log.info(e.getMessage()); } finally { Long currentDateTimeEnd = System.currentTimeMillis(); log.info("from :" + from); log.info("time:" + (currentDateTimeEnd - currentDateTimeStart)); }
}
先把入库方法 covid19Service.saveBatch(list) 注释掉,跑一下看看需要多久
只查询的话需要519544毫秒,转换成分钟就是8.6分钟。
然后我再把入库方法 covid19Service.saveBatch(list) 打开试试看,执行代码,看控制台。
836151毫秒,折算成分钟就是13分钟。
查看MySQL和es,验证数据是否一致
大功告成,全量同步就说到这里了,增量同步我们下次再相遇。
这是一个从 https://juejin.cn/post/7369120920148181027 下的原始话题分离的讨论话题