MySQL 数据导入 Elasticsearch

JAVA:

public class JDBCImporter {

	private static final String TABLE_NAME = "table_name";
	private static final String URL = "JDBC连接字符串";
	private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
	private static final StringBuffer STRING_BUFFER = new StringBuffer();
	private static final Gson GSON = new GsonBuilder().create();

	public static void main(String[] args) throws Exception {
		Connection connection = DriverManager.getConnection(URL);
		com.mysql.cj.core.result.Field[] fields = getFields(connection, TABLE_NAME);

		String sql = "SELECT * FROM " + TABLE_NAME + " WHERE id > ? ORDER BY id ASC LIMIT 10000"; //每次批量索引10000条

		int bulkSize = 0; //已索引的总数量
		long lastId = 0;  //已索引的最后一个ID
		do {
			PreparedStatement ps = connection.prepareStatement(sql);
			ps.setLong(1, lastId);
			ResultSet rs = ps.executeQuery();
			while (rs.next()) {
				bulkSize++;
				lastId = rs.getLong("id");
				Map row = new HashMap<>();
				for (int i = 0; i < fields.length; i++) {
					String key = fields[i].getColumnLabel();
					row.put(key, rs.getObject(key));
				}
				STRING_BUFFER.append("{\"index\":{\"_id\":\"" + lastId + "\"}}\n").append(GSON.toJson(row)).append("\n");
			}
			StringEntity entity = new StringEntity(STRING_BUFFER.toString(), ContentType.APPLICATION_JSON);
			HttpPost post = new HttpPost("http://192.168.56.200:9200/" + TABLE_NAME + "/" + TABLE_NAME + "/_bulk");
			post.setEntity(entity);
			CloseableHttpResponse response = HTTP_CLIENT.execute(post);
			System.out.println(EntityUtils.toString(response.getEntity()));
			STRING_BUFFER.delete(0, STRING_BUFFER.length());
			rs.close();
			ps.close();
		} while (bulkSize % 10000 == 0);

		connection.close();
	}

	// 读取MYSQL数据表的字段信息
	private static com.mysql.cj.core.result.Field[] getFields(Connection connection, String tableName) throws Exception {
		ResultSet rs = connection.prepareStatement("SELECT * FROM " + tableName + " WHERE 1=0").executeQuery();
		ResultSetMetaData metaData = rs.getMetaData();
		Field field = rs.getMetaData().getClass().getDeclaredField("fields");
		field.setAccessible(true);
		com.mysql.cj.core.result.Field[] fields = (com.mysql.cj.core.result.Field[]) field.get(metaData);
		rs.close();
		return fields;
	}

}

PHP:

public function main(){
	ES::$INDEX = 'log_apprequestlogs';
	// 取出已索引记录的最大ID
	$maxId = ES::bodySearch('log_apprequestlogs', json_decode('{"aggs":{"max_id":{"max":{"field":"id"}}}}',true));
	$start = $maxId['aggregations']['max_id']['value'];
	do{
		$i = 0;
		$Table = TableRegistry::get('LogApprequestlogs');
		$result = $Table->find('all',[
			'conditions' => "id > {$start}", //过滤已索引的记录
			'order' => 'id asc',
			'limit' => 5000 //每次批量处理5000条
		]);
		$postBodyArray = [];
		try{
			foreach($result as $row){
				$i++;
				$start = $row->id;
				$postBodyArray[] = json_encode(['index'=>['_id'=>$row->id]]);
				$postBodyArray[] = json_encode($row, JSON_UNESCAPED_UNICODE);
			}
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		try{
			ES::bulk('log_apprequestlogs', implode("\n", $postBodyArray)); // 使用 bulk API 批量索引
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		if($i%5000 != 0 || $i == 0){
			sleep(10); // 没有新数据时,等待10秒再继续处理。
			$i = 5000;
		}
	}while($i%5000 == 0);
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注