将mysql数据同步到elasticsearch

October 18, 2018
elasticsearch logstash

字数:6964

1 导读

本篇主要内容:

2 介绍

elasticsearch乃当红炸子鸡,就在本月已上市,市值50亿美金左右。源起于一个github上的开源项目,其解决了大数据、分布式环境下的痛点问题,已经纳入大厂的解决方案,随着使用的人越来越多,未来无限美好。

初浅谈来其有几个特点:
(1)使用太方便了,解压,启动就是一个命令行的事,体验也直接用命令行搞定;
(2)大道至简,至少外表是这样,相比于solr门槛太低,吸粉无数,开源插件也推波助澜;
(3)单纯的核心技术没太多新意,但也绝对不简单,还是创意太好,RESTful风格时势造成?
(4)商业化的路线是框架免费开源,插件收费,技术服务收费;
(5)基于分布式文件系统,想象空间很大,优化的索引策略,全文检索的底蕴,聚合统计也是强项,简直集万千宠爱于一身。

相关链接看这些:
http://rdc.hundsun.com/portal/article/666.html?from=MJ
http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html
http://www.sohu.com/a/200454003_465959
https://www.cnblogs.com/dreamroute/p/8484457.html
https://blog.csdn.net/wanbf123/article/details/78088444
https://blog.csdn.net/qq_33314107/article/details/80725913
https://mp.weixin.qq.com/s/5jrGZ3KkWpbIwXfWMvW-DA

两本电子书可以去看看:《elasticsearch权威指南》和《深入理解ElasticSearch》 。

logstash是一个日志收集框架,其相关的插件logstash-input-jdbc能搞定数据从mysql到elasticsearch的同步。不支持删除同步,因此删除需要做成逻辑更新。

3 应用示例

主要参考:https://www.cnblogs.com/zuolun2017/p/8082996.html

3.1 安装elasticsearch

官方下载页面:https://www.elastic.co/downloads/elasticsearch
下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.2.tar.gz

需要指定JAVA_HOME,直接运行bin/elasticsearch启动

查看:$curl http://localhost:9200/

返回:

{
  "name" : "pYmhVpz",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "W64pCc-yRGyVC4QKTCoRRA",
  "version" : {
    "number" : "6.4.2",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "04711c2",
    "build_date" : "2018-09-26T13:34:09.098244Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

最新版的elasticsearch的安装和配置,见文章:elasticserach安装

3.2 安装logstash

官方下载页面:https://www.elastic.co/downloads/logstash
下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.tar.gz

创建配置文件,参考:https://www.elastic.co/guide/en/logstash/current/configuration.html

input { stdin { } }
output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

启动: bin/logstash -f logstash-simple.conf

安装插件: bin/logstash-plugin install logstash-input-jdbc

3.3 演示

准备数据库环境,并创建表结构:

CREATE TABLE `hotel` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `hotel_name` varchar(255) DEFAULT NULL,
  `photo_url` varchar(255) DEFAULT NULL,
  `last_modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

CREATE TABLE `hotel_account` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `hotel_id` int(11) DEFAULT NULL,
  `finance_person` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

与mysql相关的配置,比如jdbc.conf:

input {
    stdin {
    }
    jdbc {
      # mysql jdbc connection string to our backup databse
      jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
      # the user we wish to excute our statement as
      jdbc_user => "root"
      jdbc_password => "root"
      # the path to our downloaded jdbc driver
      jdbc_driver_library => "/home/tao/program/mysql-connector-java-5.1.14-bin.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "jdbc.sql" #关联的要执行的SQL语句
      # 更多配置可看这里 https://www.cnblogs.com/zuolun2017/p/8082996.html
      schedule => "* * * * *"  #目前是1分钟执行一次,执行周期在这里?
      type => "jdbc" #这个会在doc的属性里体现?
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]、
        #port/host/protocol都不认
        index => "mysql01"
        document_id => "%{id}"
        ## cluster => "logstash-elasticsearch" 这个不认
    }
    stdout {
        codec => json_lines
    }
}

要执行的sql语句为:

select
    h.id as id,
    h.hotel_name as name,
    h.photo_url as img,
    ha.id as haId,
    ha.finance_person
from
    hotel h LEFT JOIN hotel_account ha on h.id = ha.hotel_id
where
    h.last_modify_time >= :sql_last_value  /*这个变量是logstash记录下来的,上次查询的时间戳*/

这样看来,在数据库端需要设置可更新的时间戳字段,以此作为增量更新分界线。

到计划时间后,logstash调用mysql连接(有池的概念吗?)执行查询,将结果解析并调用elasticsearch http api, 将数据更新到后者。下面看一下数据:

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql01",
        "_type" : "doc",
        "_id" : "5",
        "_score" : 1.0,
        "_source" : {
          "id" : 5,
          "name" : "马二帅酒店",
          "img" : "images/madashuai.img",
          "@timestamp" : "2018-10-18T08:41:01.622Z",
          "haid" : null,
          "type" : "jdbc",
          "@version" : "1",
          "finance_person" : null
        }
      },
      {
        "_index" : "mysql01",
        "_type" : "doc",
        "_id" : "4",
        "_score" : 1.0,
        "_source" : {
          "id" : 4,
          "name" : "马二帅酒店",
          "img" : "images/madashuai.img",
          "@timestamp" : "2018-10-18T08:41:01.621Z",
          "haid" : null,
          "type" : "jdbc",
          "@version" : "1",
          "finance_person" : null #关联表没有这个值
        }
      },
      {
        "_index" : "mysql01",
        "_type" : "doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "马二帅酒店",
          "img" : "images/madashuai.img",
          "@timestamp" : "2018-10-18T08:41:01.594Z",
          "haid" : 2,
          "type" : "jdbc",
          "@version" : "1",
          "finance_person" : "马二帅"
        }
      },
      {
        "_index" : "mysql01",
        "_type" : "doc",
        "_id" : "6",
        "_score" : 1.0,
        "_source" : {
          "id" : 6,
          "name" : "taoych's hotel", #有更新
          "img" : "images/madashuai.img",
          "@timestamp" : "2018-10-18T08:59:00.041Z", #数据库中的时间戳
          "haid" : 3,
          "type" : "jdbc",
          "@version" : "1", #更新也没升版本?
          "finance_person" : "马二帅"
        }
      },
      {
        "_index" : "mysql01",
        "_type" : "doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "id" : 3,
          "name" : "马二帅酒店",
          "img" : "images/madashuai.img",
          "@timestamp" : "2018-10-18T08:41:01.621Z",
          "haid" : null,
          "type" : "jdbc",
          "@version" : "1",
          "finance_person" : null
        }
      }
    ]
  }
}

4 应用方案

4.1 电商系统

这类系统,数据量大,分析和日志类的就用较多,可以用elasticsearch作为查询、全文检索的脚手架,需要事务和更新的内容放到mysql等传统关系型数据库,或mongodb等NOSQL数据库,再使用同步策略至elasticsearch。

4.2 博客系统

将博文提交给elasticsearch,搜索、统计将非常便利,这里要说的博文到底指什么,正文?像markdown生成的html站点如何集成?可以将这部分html静态部分,作为动态网站的一部分,elasticsearch展示的动态查询结果,可以高亮,再关联到html源页。这是比较快的集成,且变更不大,就是将md的内容直接塞给elasticsearch,再建立关联。甚至动态查询条件的组织直接在前端页面准备,nginx屏蔽掉DELETE/PUT/POST等请求。

4.3 其它应用

数据库在其它地方有一份,即时同步到elasticsearch,一个用于键值查询,比如返回ID和ID集,一个用于各种其它查询。


loading