主页 > IT业界  > 

mysql实时同步到es

mysql实时同步到es

测试了多个方案同步,最终选择oceanu产品,底层基于Flink cdc 1、实时性能够保证,binlog量很大时也不产生延迟 2、配置SQL即可完成,操作上简单

下面示例mysql的100张分表实时同步到es,优化备注等文本字段的like查询

创建SQL作业 CREATE TABLE from_mysql ( id int, cid int NOT NULL, gid bigint NOT NULL, content varchar, create_time TIMESTAMP(3) , PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'mysql-ip', 'port' = '3306', 'username' = 'mysqluser', 'password' = 'mysqlpwd', 'database-name' = 'mysqldb', 'debezium.snapshot.locking.mode' = 'none', 'table-name' = 'tb_test[0-9]?[0-9]', 'server-id' = '100-110', 'server-time-zone' = 'Asia/Shanghai', 'debezium.skipped.operations' = 'd', 'debezium.snapshot.mode' = 'schema_only', 'debezium.min.row.count.to.stream.results' = '50000' ); CREATE TABLE to_es ( id string, tableid int, tablename string, cid int NOT NULL, gid string NOT NULL, content string, create_time string, PRIMARY KEY (id,companyId) NOT ENFORCED ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://ip:9200', 'connector.index' = 'myindex', 'connector.document-type' = '_doc', 'connector.username' = 'elastic', 'connector.password' = 'password123', 'update-mode' = 'upsert', 'connector.key-delimiter' = '$', 'connector.key-null-literal' = 'n/a', 'connector.failure-handler' = 'retry-rejected', 'connector.flush-on-checkpoint' = 'true', 'connector.bulk-flush.max-actions' = '10000', 'connector.bulk-flush.max-size' = '2 mb', 'connector.bulk-flush.interval' = '2000', 'connector.connection-max-retry-timeout' = '300', 'format.type' = 'json' ); INSERT INTO to_es SELECT concat(CAST(id as string),'-',CAST(mod(cid,100) AS VARCHAR)) as id, id tableid, tablename, cid, gid, content, DATE_FORMAT(create_time, 'yyyy-MM-dd HH:mm:ss') as create_time from from_mysql

这里主要注意字段类型 scan.startup.mode:“initial”(默认,同步历史数据),“latest-offset” 同步增量数据 最后insert可以加where,只同步需要的行数据

es配置 配置好mapping、setting和自己的分词器

使用自字义分词是因为字段中所有涉及的标点符号、空格等都可以来检索

PUT myindex-20230314/ { "mappings": { "properties": { "id":{ "type": "text" }, "tableid":{ "type": "long" }, "cid":{ "type": "long" }, "gid":{ "type": "text", "analyzer": "my_analyzer" }, "content":{ "type": "text", "analyzer": "my_analyzer" }, "create_time" : { "type" : "keyword" } } }, "settings": { "index":{ "number_of_shards": "10", "number_of_replicas": "1", "refresh_interval" : "1s", "translog": { "sync_interval": "30s", "durability": "async" }, "codec": "best_compression", "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "my_tokenizer", "filter": [ "lowercase" ] } }, "tokenizer": { "my_tokenizer": { "type": "ngram", "min_gram": 1, "max_gram": 2, "token_chars": [ "letter", "digit","whitespace","punctuation","symbol" ] } } } } } }

使用别名,方便后续的维护

POST /_aliases { "actions": [ { "add": { "index": "myindex-20230314", "alias": "myindex" }} ] }

之前测试的

canal单进程延迟越来越大,单独配置历史数据同步go-mysql-elasticsearch经常报错重新同步logstash同步100张分表不知道怎么配置

oceanus是收费的对于运维人员不足的情况,可以参考,有精力的可以考虑flink。

标签:

mysql实时同步到es由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“mysql实时同步到es