JAVA:
public class JDBCImporter { private static final String TABLE_NAME = "table_name"; private static final String URL = "JDBC连接字符串"; private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build(); private static final StringBuffer STRING_BUFFER = new StringBuffer(); private static final Gson GSON = new GsonBuilder().create(); public static void main(String[] args) throws Exception { Connection connection = DriverManager.getConnection(URL); com.mysql.cj.core.result.Field[] fields = getFields(connection, TABLE_NAME); String sql = "SELECT * FROM " + TABLE_NAME + " WHERE id > ? ORDER BY id ASC LIMIT 10000"; //每次批量索引10000条 int bulkSize = 0; //已索引的总数量 long lastId = 0; //已索引的最后一个ID do { PreparedStatement ps = connection.prepareStatement(sql); ps.setLong(1, lastId); ResultSet rs = ps.executeQuery(); while (rs.next()) { bulkSize++; lastId = rs.getLong("id"); Maprow = new HashMap<>(); for (int i = 0; i < fields.length; i++) { String key = fields[i].getColumnLabel(); row.put(key, rs.getObject(key)); } STRING_BUFFER.append("{\"index\":{\"_id\":\"" + lastId + "\"}}\n").append(GSON.toJson(row)).append("\n"); } StringEntity entity = new StringEntity(STRING_BUFFER.toString(), ContentType.APPLICATION_JSON); HttpPost post = new HttpPost("http://192.168.56.200:9200/" + TABLE_NAME + "/" + TABLE_NAME + "/_bulk"); post.setEntity(entity); CloseableHttpResponse response = HTTP_CLIENT.execute(post); System.out.println(EntityUtils.toString(response.getEntity())); STRING_BUFFER.delete(0, STRING_BUFFER.length()); rs.close(); ps.close(); } while (bulkSize % 10000 == 0); connection.close(); } // 读取MYSQL数据表的字段信息 private static com.mysql.cj.core.result.Field[] getFields(Connection connection, String tableName) throws Exception { ResultSet rs = connection.prepareStatement("SELECT * FROM " + tableName + " WHERE 1=0").executeQuery(); ResultSetMetaData metaData = rs.getMetaData(); Field field = rs.getMetaData().getClass().getDeclaredField("fields"); field.setAccessible(true); com.mysql.cj.core.result.Field[] fields = (com.mysql.cj.core.result.Field[]) field.get(metaData); rs.close(); return fields; } }
PHP:
public function main(){ ES::$INDEX = 'log_apprequestlogs'; // 取出已索引记录的最大ID $maxId = ES::bodySearch('log_apprequestlogs', json_decode('{"aggs":{"max_id":{"max":{"field":"id"}}}}',true)); $start = $maxId['aggregations']['max_id']['value']; do{ $i = 0; $Table = TableRegistry::get('LogApprequestlogs'); $result = $Table->find('all',[ 'conditions' => "id > {$start}", //过滤已索引的记录 'order' => 'id asc', 'limit' => 5000 //每次批量处理5000条 ]); $postBodyArray = []; try{ foreach($result as $row){ $i++; $start = $row->id; $postBodyArray[] = json_encode(['index'=>['_id'=>$row->id]]); $postBodyArray[] = json_encode($row, JSON_UNESCAPED_UNICODE); } }catch(\Exception $e){ echo $e->getMessage(); sleep(10); continue; } try{ ES::bulk('log_apprequestlogs', implode("\n", $postBodyArray)); // 使用 bulk API 批量索引 }catch(\Exception $e){ echo $e->getMessage(); sleep(10); continue; } if($i%5000 != 0 || $i == 0){ sleep(10); // 没有新数据时,等待10秒再继续处理。 $i = 5000; } }while($i%5000 == 0); }