中文说明 | English Readme
Synchronize the MariaDB/MySQL table data to ElasticSearch, does not support physical deletion (physical deletion needs to be processed according to binlog), it is recommended to use logical deletion. For example, add the「deleted(1 deleted, default 0)」field to the table.
Based on jdk 8 and spring boot
- support for sub-tables, but not for sub-databases
- support routing
- support index-version
- support template
- support nested-type
git clone https://github.com/liuanxin/mysql2es.git
cd mysql2es
# all branch --> es_version
echo "`git branch`"
git checkout ${es_version}
mvn clean package -DskipTests
# change application-prod.yml to your setting
nohup java -jar -Dspring.profiles.active=prod target/mysql2es.jar >/dev/null 2>&1 &
or
# add your ~/application.yml
nohup java -jar -Dspring.config.location=~/application.yml target/mysql2es.jar >/dev/null 2>&1 &
# log in ~/logs/mysql2es.log
It is recommended to build the index scheme in ElasticSearch first (if want to generate based
on the database table field type, you can set the scheme
to true
in the configuration).
Then, the data is synchronized based on the timing rule. When synchronizing, the sql splicing increment field is used to obtain the paging data and write ElasticSearch in batches until there is no data. The last increment value will be stored(mysql or temp file), which will be used in the next synchronization(If it is the next run time but the last time it has not run, it will be postponed).
The relevant configuration instructions are as follows:
spring:
# mysql config: org.springframework.boot.autoconfigure.jdbc.DataSourceProperties + com.zaxxer.hikari.HikariConfig
datasource:
... for db ...
# es config: org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientProperties
elasticsearch.rest:
uris: 192.168.1.2:9200
db2es:
# Disabled to sync if set false, default is true
enable: false
# Where are the incremental fields stored(temp_file or mysql), default temp file
increment-type: mysql
# Default is to execute once per minute
cron: 0/5 * * * * *
# enable data compensation, default is false
enable-compensate: true
# The expression of the timed task used in data compensation
compensate-cron: 1 * * * * *
# Data compensation starts when the interval between the last handle time and the current time is within this value, unit: second. default 1200(20 minute)
begin-interval-second: 600
# The number of seconds before the data compensation. default 300(5 minute)
compensate-second: 60
# false, no version check is done when writing es, default true
version-check: false
relation:
-
# The data compensation starts only when the interval between the synchronization time and the current time is within this value. When this value is configured, the above global configuration will no longer be used, unit: seconds
begin-interval-second: 1800
# The number of time ahead for data compensation, when this value is configured, the above global configuration will no longer be used, unit: seconds
compensate-second: 1500
# *** Must set and have primary key. The primary key will generate the id of /index/type/id in ElasticSearch, if has multi, id where append with "-". can use % as a wildcard to match multiple tables(when sharding table)
table: t_order
# *** Must set. Indicates that it is used for data increment operations. Generally, it uses auto increment ~id~ or ~time~
increment-column: id
# Starting with ElasticSearch 6.0, type defaults to _doc, and the index in ElasticSearch directly corresponds to the database table name
# Indicates the index of /index/type/id in ElasticSearch, not set will be generated from the database table name (t_some_one ==> some-one), 6.0 start index name must be lowercase
index: order
# Whether to generate ElasticSearch's scheme based on the database table structure at startup, the default is false
scheme: true
# Custom sql statement (do not use ORDER BY and LIMIT, will be automatically added based on increment-column), no setting will automatically assemble from the database table
sql: "select o.id, o.order_no, o.order_status, ifnull(o.price,0) price, ifnull(o.sum,0) sum,
o.create_time, unix_timestamp(o.update_time) update_time, ifnull(o.pay_time,0) pay_time,
ifnull(o.ship_time,0) ship_time, ifnull(o.receipt_time,0) receipt_time, ifnull(o.success_time,0) success_time,
oa.receiver, oa.province, oa.city, oa.area, oa.address, oa.phone
from t_order o
left join t_order_address oa on o.id = oa.order_id"
# If sql has join multi table, main table's alias
table-alias: o
# The number of times to get from the database, the default is 1000
limit: 2000
# The id column used to generate the index will not be automatically retrieved from the table. When the table has a primary key and multiple columns of unique index, can use this configuration when you want to use the unique index to do the index id.
id-column: c1,c2
# Use when you want to prefix the index id
id-prefix:
# true: the wildcard data of the table name is used as part of the id(for example, table use t_order_% wildcard, then the table t_order_2016 will be used 2016 to the prefix of the id), the default is true
pattern-to-id: false
# Use when you want to suffix the index id
id-suffix:
# The fields used for routing, multiple separated by commas
route-column: c1,c2
# The field used for version
version-column: updated
# The field name used for data when using the ElasticSearch template
template-column: created
# The mode used for the field name of the data when using the ElasticSearch template, which is useful when used in the Date field
template-pattern: yyyy_MM
# Whether the attribute is converted to camel case, true will convert user_name in the table to userName, the default is false
column-lower-camel: true
# By default, it will be generated from the table field (c_some_type ==> someType), and only special cases can set.
mapping:
# table column(Use alias if there is an alias) : elasticsearch field
c_type: type
# The above sql does not want to write the index of the column (if the column has an alias, use the alias)
ignore-column: c1,c2
# Limit start in sql, start in 1000 exceeds this value will be optimized into inner join statement, the default is 2000
big-count-to-sql: 10000
# The primary key name, when the table data is a lot, use LIMIT 10million,1000 efficiency will be very slow, this field will optimize the sql statement, the default is id
primary-key: id
# Original sql : SELECT ... FROM t_table WHERE time > '2010-01-01 00:00:01' LIMIT 10 million, 1000
# Optimized sql: SELECT ... FROM t_table c inner join (SELECT id FROM t_table WHERE time > '2010-01-01 00:00:01' LIMIT 10 million, 1000) t on t.id = c.id
# The child table that is associated one-to-one with the main table data will eventually be a peer, with the main table data(used in the above SQL Left join or, if the SQL left join query performance than a single query, can use this way)
relation-mapping:
address:
main-field: id
sql: SELECT receiver, province, city, area, address, phone FROM t_order_address
child-field: order_id
# On a child table that is associated with the main table data will eventually build a personal List properties(for nested structures)
nested-mapping:
# If has table `t_order` and `t_order_item`, `t_order` : id, `t_order_item` : order_id, then main-field => id, child-field => order_id
item:
main-field: id
sql: "SELECT oi.sku_id, p.name, oi.unit_price, oi.num, oi.total
FROM t_order_item oi LEFT JOIN t_product p ON ps.product_id = p.id"
child-field: order_id
-
table: t_product
increment-column: update_time
about cron
.------------------- second (0 - 59) if (0/10) then (0, 10, 20, 30, 40, 50) run . .---------------- minute (0 - 59) . . .------------- hour (0 - 23) . . . .---------- day of month (1 - 31) . . . . .------- month (1 - 12) OR jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec . . . . . .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat . . . . . . ? * * * * * for example: 0/5 * * * * * means that it runs every 5 seconds
index with elasticsearch scheme example
DELETE /order
PUT /order
{
"settings": {
"number_of_shards": "5",
"number_of_replicas": "0",
"analysis": {
"normalizer": {
"self_normalizer": {
"type": "custom",
"filter": ["trim", "lowercase"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"order_no": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"order_status": {
"type": "integer"
},
"create_time": {
"type": "date",
"format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS"
},
"pay_time": {
"type": "date",
"format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS"
},
"receiver": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"address": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"phone": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"item": {
"type": "nested",
"properties": {
"sku_id": {
"type": "long"
},
"name": {
"type": "text",
"normalizer": "self_normalizer"
},
"sku_desc": {
"type": "keyword",
"normalizer": "self_normalizer"
}
}
}
}
}
}
POST /_aliases
{
"actions" : [
{ "remove" : { "index" : "order", "alias" : "old_order_query" } },
{ "add" : { "index" : "order", "alias" : "new_order_query" } }
]
}
The index is built on a monthly basis based on the template, and the example of es using aliases for the application example
DELETE /_template/order
PUT /_template/order
{
"index_patterns": [ "order_*" ],
"aliases": {
"order_query": {}
},
"settings": {
"number_of_shards": "1",
"number_of_replicas": "0",
"analysis": {
"normalizer": {
"self_normalizer": {
"type": "custom",
"filter": ["trim", "lowercase"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"order_no": {
"type": "keyword",
"normalizer": "self_normalizer"
},
"order_status": {
"type": "integer"
}
}
}
}