MySQL 数据导入 Elasticsearch

JAVA:

public class JDBCImporter {

	private static final String TABLE_NAME = "table_name";
	private static final String URL = "JDBC连接字符串";
	private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
	private static final StringBuffer STRING_BUFFER = new StringBuffer();
	private static final Gson GSON = new GsonBuilder().create();

	public static void main(String[] args) throws Exception {
		Connection connection = DriverManager.getConnection(URL);
		com.mysql.cj.core.result.Field[] fields = getFields(connection, TABLE_NAME);

		String sql = "SELECT * FROM " + TABLE_NAME + " WHERE id > ? ORDER BY id ASC LIMIT 10000"; //每次批量索引10000条

		int bulkSize = 0; //已索引的总数量
		long lastId = 0;  //已索引的最后一个ID
		do {
			PreparedStatement ps = connection.prepareStatement(sql);
			ps.setLong(1, lastId);
			ResultSet rs = ps.executeQuery();
			while (rs.next()) {
				bulkSize++;
				lastId = rs.getLong("id");
				Map row = new HashMap<>();
				for (int i = 0; i < fields.length; i++) {
					String key = fields[i].getColumnLabel();
					row.put(key, rs.getObject(key));
				}
				STRING_BUFFER.append("{\"index\":{\"_id\":\"" + lastId + "\"}}\n").append(GSON.toJson(row)).append("\n");
			}
			StringEntity entity = new StringEntity(STRING_BUFFER.toString(), ContentType.APPLICATION_JSON);
			HttpPost post = new HttpPost("http://192.168.56.200:9200/" + TABLE_NAME + "/" + TABLE_NAME + "/_bulk");
			post.setEntity(entity);
			CloseableHttpResponse response = HTTP_CLIENT.execute(post);
			System.out.println(EntityUtils.toString(response.getEntity()));
			STRING_BUFFER.delete(0, STRING_BUFFER.length());
			rs.close();
			ps.close();
		} while (bulkSize % 10000 == 0);

		connection.close();
	}

	// 读取MYSQL数据表的字段信息
	private static com.mysql.cj.core.result.Field[] getFields(Connection connection, String tableName) throws Exception {
		ResultSet rs = connection.prepareStatement("SELECT * FROM " + tableName + " WHERE 1=0").executeQuery();
		ResultSetMetaData metaData = rs.getMetaData();
		Field field = rs.getMetaData().getClass().getDeclaredField("fields");
		field.setAccessible(true);
		com.mysql.cj.core.result.Field[] fields = (com.mysql.cj.core.result.Field[]) field.get(metaData);
		rs.close();
		return fields;
	}

}

PHP:

public function main(){
	ES::$INDEX = 'log_apprequestlogs';
	// 取出已索引记录的最大ID
	$maxId = ES::bodySearch('log_apprequestlogs', json_decode('{"aggs":{"max_id":{"max":{"field":"id"}}}}',true));
	$start = $maxId['aggregations']['max_id']['value'];
	do{
		$i = 0;
		$Table = TableRegistry::get('LogApprequestlogs');
		$result = $Table->find('all',[
			'conditions' => "id > {$start}", //过滤已索引的记录
			'order' => 'id asc',
			'limit' => 5000 //每次批量处理5000条
		]);
		$postBodyArray = [];
		try{
			foreach($result as $row){
				$i++;
				$start = $row->id;
				$postBodyArray[] = json_encode(['index'=>['_id'=>$row->id]]);
				$postBodyArray[] = json_encode($row, JSON_UNESCAPED_UNICODE);
			}
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		try{
			ES::bulk('log_apprequestlogs', implode("\n", $postBodyArray)); // 使用 bulk API 批量索引
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		if($i%5000 != 0 || $i == 0){
			sleep(10); // 没有新数据时,等待10秒再继续处理。
			$i = 5000;
		}
	}while($i%5000 == 0);
}

Elasticsearch 地理位置搜索

Class ES : 根据 Elasticsearch API 编写的搜索工具类,以下为PHP代码:

ES::$INDEX = 'geo_index';

// 创建包含geo_point类型字段的索引类型
$data = json_encode([
	'mappings' => [
		'geo_type' => [
			'properties' => [
				'username' => ['type'=>'text'],
				'location' => ['type'=>'geo_point']
			]
		]
	]
]);
$cmd = "curl -XPUT http://192.168.56.101:9200/geo_index -d '{$data}'";
echo shell_exec($cmd);

// 插入数据
$data = json_encode([
	'username' => 'xd'.time(),
	'location' => [
		'lat' => 40.12,
		'lon' => -71.34
	]
]);
$cmd = "curl -XPOST http://192.168.56.101:9200/geo_index/geo_type -d '{$data}'";
echo shell_exec($cmd);

// 矩形区域搜索
$data = < <<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_bounding_box": {
		  "location": {
			"top_left": {
			  "lat": 40.73,
			  "lon": -74.1
			},
			"bottom_right": {
			  "lat": 40.01,
			  "lon": -71.12
			}
		  }
		}
	  }
	}
  }
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

// 距离搜索
$data = <<<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_distance": {
		  "distance": "1000km",
		  "location": {
			"lat": 40.73,
			"lon": -74.1
		  }
		}
	  }
	}
  },
  "sort": [
	{
	  "_geo_distance": {
		"location": {
		  "lat": 40.73,
		  "lon": -74.1
		},
		"order": "asc"
	  }
	}
  ]
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

// 多边形区域搜索
$data = <<<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_polygon": {
		  "location": {
			"points": [
			  {
				"lat": 40,
				"lon": -70
			  },
			  {
				"lat": 30,
				"lon": -80
			  },
			  {
				"lat": 50,
				"lon": -90
			  }
			]
		  }
		}
	  }
	}
  }
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

安装 Hadoop 集群

官方文档: http://hadoop.apache.org/docs/stable/
Server 1: 192.168.56.200 (NameNode + DataNode)
Server 2: 192.168.56.201 (DataNode)

  1)安装JDK
    ~#tar xzf jdk-8u112-linux-x64.tar.gz
    ~#mv jdk1.8.0_112/ /srv/
    ~#echo "JAVA_HOME=/srv/jdk1.8.0_112" >> /etc/environment
    ~#vim /etc/profile #在 export PATH 之前添加下行
      pathmunge $JAVA_HOME/bin
    ~#reboot
    ~#java -version
  2)修改主机名
    ~#vim /etc/hosts #添加以下两行
      192.168.56.200 node200
      192.168.56.201 node201
    ~#hostnamectl set-hostname node200
  3)创建 hadoop 存储目录
    ~#mkdir /data/dfs/name #NameNode使用的目录
    ~#mkdir /data/dfs/data #DataNode使用的目录
  4)安装 Hadoop 集群
    ~#tar xzf hadoop-2.7.3.tar.gz
    ~#mv hadoop-2.7.3 /srv/
    ~#cd /srv/hadoop-2.7.3/etc/hadoop/
    ~#vim core-site.xml #修改为如下配置
      <configuration>
        <property>
          <name>fs.defaultFS</name>
          <value>hdfs://node200:9000</value>
        </property>
      </configuration>
    ~#vim hdfs-site.xml #修改为如下配置
      <configuration>
        <property>
           <name>dfs.replication</name>
           <value>1</value>
        </property>
        <property>
          <name>dfs.namenode.name.dir</name>
          <value>/data/dfs/name</value>
        </property>
        <property>
          <name>dfs.datanode.data.dir</name>
          <value>/data/dfs/data</value>
        </property>
      </configuration>
    ~#vim slaves #修改为如下配置(每行一个data节点的主机名)
      node200
      node201
  5)在 201 机器上重复以上1、2、3三个步骤(以下一处不同的地方)
    hostnamectl set-hostname node201
  6)从 node200 复制 Hadoop 到 node201 ,无需修改任何配置
    ~#scp -r /srv/spark-2.0.2-bin-hadoop2.7 root@node201:/srv/
  7)分别在每个节点设置环境变量
    ~#echo "HADOOP_PREFIX=/srv/hadoop-2.7.3" >> /etc/environment
    ~#reboot
  8)格式化 NameNode 存储目录
    ~#/srv/hadoop-2.7.3/bin/hdfs namenode -format
  9)启动 Hadoop 集群,NameNode 和 DataNode 节点均使用 NameNode 节点(node200)的脚本启动
    配置SSH免密登录(node200->node200, node200->node201)
    ~#ssh-keygen
    ~#ssh-copy-id root@node200
    ~#ssh-copy-id root@node201
    启动 NameNode、DataNode
    ~#/srv/hadoop-2.7.3/sbin/start-dfs.sh #试验环境使用 Spark 进行计算,此处仅启动 hdfs 服务
    使用浏览器访问 http://192.168.56.200:50070/ #查看 Hadoop 集群状态

安装 Spark 集群(Standalone模式)

官方文档: http://spark.apache.org/docs/latest/cluster-overview.html
Server 1: 192.168.56.200 (Master + Worker)
Server 2: 192.168.56.201 (Worker)

  1)安装JDK
    ~#tar xzf jdk-8u112-linux-x64.tar.gz
    ~#mv jdk1.8.0_112/ /srv/
    ~#echo "JAVA_HOME=/srv/jdk1.8.0_112" >> /etc/environment
    ~#vim /etc/profile #在 export PATH 之前添加下行
      pathmunge $JAVA_HOME/bin
    ~#reboot
    ~#java -version
  2)修改主机名
    ~#vim /etc/hosts #添加以下两行
      192.168.56.200 node200
      192.168.56.201 node201
    ~#hostnamectl set-hostname node200
  3)安装 Spark
    ~#tar xzf spark-2.0.2-bin-hadoop2.7.tgz
    ~#mv spark-2.0.2-bin-hadoop2.7 /srv/
    ~#cd /srv/spark-2.0.2-bin-hadoop2.7/conf
    ~#mv spark-env.sh.template spark-env.sh
    ~#vim spark-env.sh #添加以下配置
      SPARK_LOCAL_IP=192.168.56.200
      SPARK_MASTER_HOST=node200
    ~#mv slaves.template slaves
    ~#vim slaves #配置 worker 节点如下(每行一个主机名)
      node200
      node201
    ~#mv spark-defaults.conf.template spark-defaults.conf
    ~#vim spark-defaults.conf #配置默认的环境变量如下(使用spark-submit提交任务时使用)
      spark.master spark://node200:7077
  4)在 201 机器上重复以上1和2两个步骤(以下一处不同的地方),安装第二个 Spark 节点。
    hostnamectl set-hostname node201
  5)从 node200 复制 Spark 到 node201 ,并修改 node201 的配置
    ~#scp -r /srv/spark-2.0.2-bin-hadoop2.7 root@node201:/srv/
    ~#ssh root@node201
    ~#cd /srv/spark-2.0.2-bin-hadoop2.7/conf
    ~#vim spark-env.sh  #修改以下配置
      SPARK_LOCAL_IP=192.168.56.201
  6)分别在每个节点设置环境变量
    ~#echo "SPARK_HOME=/srv/spark-2.0.2-bin-hadoop2.7" >> /etc/environment
    ~#reboot
  7)启动 Spark 集群,Master 和 Worker 节点均使用 Master节点(node200)的脚本启动
    配置SSH免密登录(node200->node200, node200->node201)
    ~#ssh-keygen
    ~#ssh-copy-id root@node200
    ~#ssh-copy-id root@node201
    启动 Master、Worker
    ~#/srv/spark-2.0.2-bin-hadoop2.7/sbin/start-master.sh #启动 Master(node200)
    ~#/srv/spark-2.0.2-bin-hadoop2.7/sbin/start-slaves.sh #启动 Worker(node200 + node201)
    ~#cd /srv/spark-2.0.2-bin-hadoop2.7
    测试提交任务
    ~#./bin/spark-submit --class org.apache.spark.examples.JavaSparkPi ./examples/jars/spark-examples_2.11-2.0.2.jar
    使用浏览器访问 http://192.168.56.200:8080/ #查看 Spark 集群状态

Elasticsearch 基础操作

官方文档: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/index.html
创建索引
~#curl -XPUT http://192.168.56.200:9200/test?pretty
关闭索引
~#curl -XPOST http://192.168.56.200:9200/test/_close?pretty
修改分词器
~#curl -XPUT 'http://192.168.56.200:9200/test/_settings?preserve_existing=true&pretty' -d '{"index.analysis.analyzer.default.type" : "ik_max_word"}'
修改 dynamic mapper 配置
~#curl -XPUT 'http://192.168.56.200:9200/test/_settings?pretty' -d '{"index.mapper.dynamic" : true}'
打开索引
~#curl -XPOST http://192.168.56.200:9200/test/_open?pretty
查看索引状态
~#curl -XGET http://192.168.56.200:9200/test?pretty
索引文档
~#curl -XPUT http://192.168.56.200:9200/test/test/1?pretty -d '{"country":"中华人民共和国","created_at":"1949-10-01T00:00:00"}'
根据条件删除文档
~#curl -XPOST http://192.168.56.200:9200/test/test/_delete_by_query?pretty -d '{"query":{"match_all":{}}}'
重建索引
~#curl -XPOST http://192.168.56.200:9200/_reindex?pretty -d '{"source": {"index": "test"}, "dest": {"index": "test_v2"} }'
创建别名
~#curl -XPOST http://192.168.56.200:9200/_aliases?pretty -d '{"actions" : [{ "add" : { "index" : "test", "alias" : "test_v1" } }] }'
删除别名
~#curl -XPOST http://192.168.56.200:9200/_aliases?pretty -d '{"actions" : [{ "remove" : { "index" : "test", "alias" : "test_v1" } }] }'
删除索引
~#curl -XDELETE http://192.168.56.200:9200/test?pretty

安装 Elasticsearch 插件

  1)安装 elasticsearch-head (for Elasticsearch 5.x: site plugins are not supported. Run elasticsearch-head as a standalone server)
    ~#vim /srv/elasticsearch-5.0.1/config/elasticsearch.yml #添加/修改以下配置
      http.cors.enabled: true
      http.cors.allow-origin: http://elasticsearch-head
    ~#git clone git://github.com/mobz/elasticsearch-head.git
    将 elasticsearch-head/_site 目录当作静态站点发布到 web server 中( VirutalHost: elasticsearch-head ),使用浏览器直接访问就可以使用了。
  2)安装 elasticsearch-sql
    ~#vim /srv/elasticsearch-5.0.1/config/elasticsearch.yml #添加/修改以下配置
      http.cors.enabled: true
      http.cors.allow-origin: http://elasticsearch-head,http://elasticsearch-sql
    ~#/srv/elasticsearch-5.0.1/bin/elasticsearch-plugin install https://github.com/NLPchina/elasticsearch-sql/releases/download/5.0.1/elasticsearch-sql-5.0.1.0.zip
    ~#wget https://github.com/NLPchina/elasticsearch-sql/releases/download/5.0.1/es-sql-site-standalone.zip
    ~#unzip es-sql-site-standalone.zip
    将 es-sql-site/_site 目录当作静态站点发布到 web server 中( VirutalHost: elasticsearch-sql ),使用浏览器直接访问就可以使用了。
  3)安装 elasticsearch-analysis-ik
    ~#wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v5.0.1/elasticsearch-analysis-ik-5.0.1.zip
    ~#unzip -d /srv/elasticsearch-5.0.1/plugins/ik elasticsearch-analysis-ik-5.0.1.zip
    重启 Elasticsearch 服务(安装完成)
    ~#curl -XPOST http://192.168.56.200:9200/test/_close?pretty
    ~#curl -XPUT 'http://192.168.56.200:9200/test/_settings?preserve_existing=true&pretty' -d '{"index.analysis.analyzer.default.type" : "ik_max_word"}'
    ~#curl -XPOST http://192.168.56.200:9200/test/_open?pretty
    以上三条命令将 test 索引的默认分词器修改为 ik 。

安装 Elasticsearch 集群

官方文档: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/index.html
Server 1: 192.168.56.200
Server 2: 192.168.56.201

  1)添加 elasticsearch 用户
    ~#useradd -b /home -M -s /sbin/nologin -U elasticsearch
  2)安装JDK
    ~#tar xzf jdk-8u112-linux-x64.tar.gz
    ~#mv jdk1.8.0_112/ /srv/
    ~#echo "JAVA_HOME=/srv/jdk1.8.0_112" >> /etc/environment
    ~#vim /etc/profile #在 export PATH 之前添加下行
      pathmunge $JAVA_HOME/bin
    ~#reboot
    ~#java -version
  3)修改系统参数、主机名
    ~#echo "vm.max_map_count=262144" >> /etc/sysctl.conf
    ~#vim /etc/security/limits.conf #添加以下四行
      * soft nofile 1000000
      * hard nofile 1000000
      elasticsearch soft memlock unlimited
      elasticsearch hard memlock unlimited
    ~#vim /etc/hosts #添加以下两行
      192.168.56.200 node200
      192.168.56.201 node201
    ~#hostnamectl set-hostname node200
    ~#reboot
    ~#ulimit -a
  4)安装ES
    ~#mkdir /data/elasticsearch/data
    ~#mkdir /data/elasticsearch/logs
    ~#chown -R elasticsearch:elasticsearch /data/elasticsearch
    ~#tar xzf elasticsearch-5.0.1.tar.gz
    ~#mv elasticsearch-5.0.1 /srv/
    ~#vim /srv/elasticsearch-5.0.1/config/elasticsearch.yml #修改以下配置
      cluster.name: ESCluster #同一集群中所有节点的 claster.name 必须一致
      node.name: node200
      path.data: /data/elasticsearch/data
      path.logs: /data/elasticsearch/logs
      bootstrap.memory_lock: true
      network.host: 192.168.56.200
      http.port: 9200
      discovery.zen.ping.unicast.hosts: ["node200", "node201"] #集群中所有节点的主机名(自动加入集群、自动选举Master)
      discovery.zen.minimum_master_nodes: 2
    ~#vim /srv/elasticsearch-5.0.1/config/jvm.options #修改以下配置
      -Xms1g #参考文档 https://www.elastic.co/guide/en/elasticsearch/guide/current/heap-sizing.html
      -Xmx1g
    ~#chown -R elasticsearch:elasticsearch /srv/elasticsearch-5.0.1/config
  5)启动 Elasticsearch 服务(node200)
    ~#sudo -u elasticsearch /srv/elasticsearch-5.0.1/bin/elasticsearch -d
    ~#curl http://192.168.56.200:9200/ #看到以下输出代表 Elasticsearch 已正确运行
      {
        "name" : "node200",
        "cluster_name" : "ESCluster",
        "cluster_uuid" : "qLPhdOwRQW6vcxRLv0utrw",
        "version" : {
          "number" : "5.0.1",
          "build_hash" : "080bb47",
          "build_date" : "2016-11-11T22:08:49.812Z",
          "build_snapshot" : false,
          "lucene_version" : "6.2.1"
        },
        "tagline" : "You Know, for Search"
      }
  6)在 201 机器上重复以上1、2、3、4四个步骤(需要修改以下三点不同的地方),安装第二个ES节点。
    hostnamectl set-hostname node201
    node.name: node201
    network.host: 192.168.56.201
  7)启动 Elasticsearch 服务(node201)
    ~#sudo -u elasticsearch /srv/elasticsearch-5.0.1/bin/elasticsearch -d
    ~#curl http://192.168.56.201:9200/ #看到以下输出代表 Elasticsearch 已正确运行
      {
        "name" : "node201",
        "cluster_name" : "ESCluster",
        "cluster_uuid" : "qLPhdOwRQW6vcxRLv0utrw",
        "version" : {
          "number" : "5.0.1",
          "build_hash" : "080bb47",
          "build_date" : "2016-11-11T22:08:49.812Z",
          "build_snapshot" : false,
          "lucene_version" : "6.2.1"
        },
        "tagline" : "You Know, for Search"
      }
  8)查看集群状态
    ~#curl -XGET http://192.168.56.200:9200/_cluster/health?pretty
    ~#curl -XGET http://192.168.56.200:9200/_cluster/state?pretty