PHP合并多维数组

array_merge 只能合并一维;
array_merge_recursive 虽然可以递归地合并多维数组,但是它会把相同的 key 的值合并到一个数组中去,而不是保留后面的数组的值;

/**
 * 合并数组
 * 使用引用传递参数 $arr1 最终会被修改为合并后的数组
 * 相同 key 的情况下,如果 $arr1 的值和 $arr2 的值都是数组类型,则继续合并下级数组,否则 $arr2 的值覆盖 $arr1 的值;
 * 如果 $arr2 中存在的 key=>value 在 $arr1 中不存在,则将相差的 key=>value 与 $arr1 合并;
 */
public static function arrayMerge(&$arr1, &$arr2){
	foreach($arr1 as $k1 => &$v1){
		foreach($arr2 as $k2 => &$v2){
			if( $k1 === $k2 ){
				if( is_array($v1) && is_array($v2) ){
					$v1 = static::arrayMerge($v1, $v2);
				}else{
					$v1 = $v2;
				}
			}
		}
	}
	return $arr1 = array_merge($arr1, array_diff_key($arr2, $arr1));
}

MySQL 数据导入 Elasticsearch

JAVA:

public class JDBCImporter {

	private static final String TABLE_NAME = "table_name";
	private static final String URL = "JDBC连接字符串";
	private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
	private static final StringBuffer STRING_BUFFER = new StringBuffer();
	private static final Gson GSON = new GsonBuilder().create();

	public static void main(String[] args) throws Exception {
		Connection connection = DriverManager.getConnection(URL);
		com.mysql.cj.core.result.Field[] fields = getFields(connection, TABLE_NAME);

		String sql = "SELECT * FROM " + TABLE_NAME + " WHERE id > ? ORDER BY id ASC LIMIT 10000"; //每次批量索引10000条

		int bulkSize = 0; //已索引的总数量
		long lastId = 0;  //已索引的最后一个ID
		do {
			PreparedStatement ps = connection.prepareStatement(sql);
			ps.setLong(1, lastId);
			ResultSet rs = ps.executeQuery();
			while (rs.next()) {
				bulkSize++;
				lastId = rs.getLong("id");
				Map row = new HashMap<>();
				for (int i = 0; i < fields.length; i++) {
					String key = fields[i].getColumnLabel();
					row.put(key, rs.getObject(key));
				}
				STRING_BUFFER.append("{\"index\":{\"_id\":\"" + lastId + "\"}}\n").append(GSON.toJson(row)).append("\n");
			}
			StringEntity entity = new StringEntity(STRING_BUFFER.toString(), ContentType.APPLICATION_JSON);
			HttpPost post = new HttpPost("http://192.168.56.200:9200/" + TABLE_NAME + "/" + TABLE_NAME + "/_bulk");
			post.setEntity(entity);
			CloseableHttpResponse response = HTTP_CLIENT.execute(post);
			System.out.println(EntityUtils.toString(response.getEntity()));
			STRING_BUFFER.delete(0, STRING_BUFFER.length());
			rs.close();
			ps.close();
		} while (bulkSize % 10000 == 0);

		connection.close();
	}

	// 读取MYSQL数据表的字段信息
	private static com.mysql.cj.core.result.Field[] getFields(Connection connection, String tableName) throws Exception {
		ResultSet rs = connection.prepareStatement("SELECT * FROM " + tableName + " WHERE 1=0").executeQuery();
		ResultSetMetaData metaData = rs.getMetaData();
		Field field = rs.getMetaData().getClass().getDeclaredField("fields");
		field.setAccessible(true);
		com.mysql.cj.core.result.Field[] fields = (com.mysql.cj.core.result.Field[]) field.get(metaData);
		rs.close();
		return fields;
	}

}

PHP:

public function main(){
	ES::$INDEX = 'log_apprequestlogs';
	// 取出已索引记录的最大ID
	$maxId = ES::bodySearch('log_apprequestlogs', json_decode('{"aggs":{"max_id":{"max":{"field":"id"}}}}',true));
	$start = $maxId['aggregations']['max_id']['value'];
	do{
		$i = 0;
		$Table = TableRegistry::get('LogApprequestlogs');
		$result = $Table->find('all',[
			'conditions' => "id > {$start}", //过滤已索引的记录
			'order' => 'id asc',
			'limit' => 5000 //每次批量处理5000条
		]);
		$postBodyArray = [];
		try{
			foreach($result as $row){
				$i++;
				$start = $row->id;
				$postBodyArray[] = json_encode(['index'=>['_id'=>$row->id]]);
				$postBodyArray[] = json_encode($row, JSON_UNESCAPED_UNICODE);
			}
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		try{
			ES::bulk('log_apprequestlogs', implode("\n", $postBodyArray)); // 使用 bulk API 批量索引
		}catch(\Exception $e){
			echo $e->getMessage();
			sleep(10);
			continue;
		}
		if($i%5000 != 0 || $i == 0){
			sleep(10); // 没有新数据时,等待10秒再继续处理。
			$i = 5000;
		}
	}while($i%5000 == 0);
}

Elasticsearch 地理位置搜索

Class ES : 根据 Elasticsearch API 编写的搜索工具类,以下为PHP代码:

ES::$INDEX = 'geo_index';

// 创建包含geo_point类型字段的索引类型
$data = json_encode([
	'mappings' => [
		'geo_type' => [
			'properties' => [
				'username' => ['type'=>'text'],
				'location' => ['type'=>'geo_point']
			]
		]
	]
]);
$cmd = "curl -XPUT http://192.168.56.101:9200/geo_index -d '{$data}'";
echo shell_exec($cmd);

// 插入数据
$data = json_encode([
	'username' => 'xd'.time(),
	'location' => [
		'lat' => 40.12,
		'lon' => -71.34
	]
]);
$cmd = "curl -XPOST http://192.168.56.101:9200/geo_index/geo_type -d '{$data}'";
echo shell_exec($cmd);

// 矩形区域搜索
$data = < <<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_bounding_box": {
		  "location": {
			"top_left": {
			  "lat": 40.73,
			  "lon": -74.1
			},
			"bottom_right": {
			  "lat": 40.01,
			  "lon": -71.12
			}
		  }
		}
	  }
	}
  }
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

// 距离搜索
$data = <<<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_distance": {
		  "distance": "1000km",
		  "location": {
			"lat": 40.73,
			"lon": -74.1
		  }
		}
	  }
	}
  },
  "sort": [
	{
	  "_geo_distance": {
		"location": {
		  "lat": 40.73,
		  "lon": -74.1
		},
		"order": "asc"
	  }
	}
  ]
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

// 多边形区域搜索
$data = <<<JSON
{
  "query": {
	"bool": {
	  "filter": {
		"geo_polygon": {
		  "location": {
			"points": [
			  {
				"lat": 40,
				"lon": -70
			  },
			  {
				"lat": 30,
				"lon": -80
			  },
			  {
				"lat": 50,
				"lon": -90
			  }
			]
		  }
		}
	  }
	}
  }
}
JSON;
$response = ES::bodySearch('geo_type', json_decode($data, true));

php 读取 access 数据库

百度出来的教程几乎都已经过时了,因为微软的ODBC驱动升级后,连接驱动字符串变了。
如果电脑安装过 Office 软件选择了 AccessDB 功能,就不需要安装驱动了。否则,需要安装 AccessRuntime(ODBC驱动)。
完成上一步后,这个链接字符串可以在服务器ODBC数据源管理的地方查询到,如果是手动安装的 AccessRuntime ,就需要手动添加数据源。

< ?php
set_time_limit(0);
date_default_timezone_set('Asia/Shanghai');
require 'memcached.php';

$mdbPath = 'xxxxxx.mdb';
$memcacheServerIP = '192.168.100.100';
$memcacheServerPort = 11211;
$memcacheKey = 'MarketData';

while(true){
	try{
		$connection = odbc_connect("DRIVER={Microsoft Access Driver (*.mdb, *.accdb)}; DBQ=$mdbPath;", null, null);
		$rs = odbc_exec($connection, 'SELECT * FROM ELE');
		$data = [];
		while($row = odbc_fetch_array($rs)){
			foreach($row as &$val){
				$val = iconv('gbk', 'utf-8', $val);
			}
			$data[] = $row;
		}
		odbc_close($connection);

		$m = new Memcached;
		$m->addServer($memcacheServerIP, $memcacheServerPort);
		$m->set($memcacheKey, $data);
		echo date('Y-m-d H:i:s'), ' Data count: ', count($m->get($memcacheKey)), "\r\n";
		$m->close();
	}catch(Exception $e){
		var_dump($e);
	}
	sleep(1);
}

使用html5本地存储优化网页请求

(function(s){
	if(s){
		var userCity = s.getItem("userCity");
		if( userCity == null ){
			$.get("/index/userCity",null,function(d){s.setItem("userCity",d);$("#AsyncUserCity").html(d)});
		}else{
			$("#AsyncUserCity").html(userCity);
		}
		var hotKeywords = s.getItem("hotKeywords");
		if( hotKeywords == null ){
			$.get("/index/hotKeywords",null,function(d){s.setItem("hotKeywords",d);$("#AsyncHotKeywords").html(d)});
		}else{
			$("#AsyncHotKeywords").html(hotKeywords);
		}
	}else{
		$("#AsyncUserCity").load("/index/userCity");
		$("#AsyncHotKeywords").load("/index/hotKeywords");
	}
})(window.sessionStorage);

php curl 并发请求

protected static function mCurl($urlArray){
	$mh = curl_multi_init();
	$chArray = [];
	foreach($urlArray as $url){
		$chArray[] = $ch = curl_init(strpos($url,'/')===0?API_URL.$url:$url);
		curl_setopt($ch, CURLOPT_TIMEOUT, 15);
		curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
		curl_setopt($ch, CURLOPT_FOLLOWLOCATION, false);
		curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
		curl_multi_add_handle($mh, $ch);
	}
	curl_multi_exec($mh, $still_running);
	while($still_running){
		curl_multi_select($mh);
		curl_multi_exec($mh, $still_running);
	}
	$rsArray = [];
	foreach($chArray as $ch){
		$response = curl_multi_getcontent($ch);
		self::log(curl_getinfo($ch,CURLINFO_EFFECTIVE_URL)."\r\n".$response);
		$rsArray[] = json_decode($response, true);
		curl_multi_remove_handle($mh, $ch);
	}
	curl_multi_close($mh);
	return $rsArray;
}

php openssl(SHA1WithRSA) 签名 验签

签名:

public function wjSign($data){
	$key = openssl_pkey_get_private(file_get_contents($this->privateKeyPathWJ));
	openssl_sign($data, $sign, $key, OPENSSL_ALGO_SHA1);
	$sign = base64_encode($sign);
	return $sign;
}

验签:

public function wjVerify($data, $sign){
	$sign = base64_decode($sign);
	$key = openssl_pkey_get_public(file_get_contents($this->publicKeyPathWJ));
	$result = openssl_verify($data, $sign, $key, OPENSSL_ALGO_SHA1) === 1;
	return $result;
}

php openssl 分段 加密 解密

加密:

public function encrypt($data){
	$crypted = [];
	$data = json_encode($data);
	
	$publicKey = openssl_pkey_get_public(file_get_contents($this->publicKeyPath));
	$dataArray = str_split($data, 117);
	foreach($dataArray as $subData){
		$subCrypted = null;
		openssl_public_encrypt($subData, $subCrypted, $publicKey);
		$crypted[] = $subCrypted;
	}
	$this->log($data);
	return base64_encode(implode('',$crypted));
}

解密:

public function decrypt($data){
	$decrypted = [];
	$data = base64_decode($data);
	
	$privateKey = openssl_pkey_get_private(file_get_contents($this->privateKeyPath));
	$dataArray = str_split($data, 128);
	foreach($dataArray as $subData){
		$subDecrypted = null;
		openssl_private_decrypt($subData, $subDecrypted, $privateKey);
		$decrypted[] = $subDecrypted;
	}
	$decrypted = implode('',$decrypted);
	$this->log($decrypted);
	return json_decode($decrypted, true);
}

PHP多进程处理消息队列

#!/usr/bin/env php
< ?php
set_time_limit(0);
define('DEFUNCT_PATH', '/dev/shm/defunct.pids');
define('QUEUE', '/queue/callback');
try{
        $stomp = new Stomp('tcp://192.168.100.100:61613');
}catch(StompException $e){
        exit($e->getMessage());
}
file_exists(DEFUNCT_PATH) && unlink(DEFUNCT_PATH);
touch(DEFUNCT_PATH);
$stomp->subscribe(QUEUE);
while(true){
        if( mt_rand(1,10) > 8 ){
                $defunctPids = [];
                $handle = fopen(DEFUNCT_PATH, 'r+');
                flock($handle, LOCK_EX);
                while($pid = fgets($handle)){
                        $defunctPids[] = $pid;
                }
                ftruncate($handle, 0);
                fclose($handle);
                foreach($defunctPids as $pid){
                        pcntl_waitpid($pid, $status);
                }
                unset($defunctPids);
        }
        $frame = $stomp->readFrame();
        if( empty($frame) ) continue;
        $pid = pcntl_fork();
        if($pid === -1){
                exit('pcntl_fork error');
        }else if($pid === 0){
                echo date('Y-m-d H:i:s'),' ',$frame->body,"\r\n";
                file_put_contents(DEFUNCT_PATH, posix_getpid()."\r\n", FILE_APPEND | LOCK_EX);
                posix_kill(posix_getpid(), SIGKILL);
        }else{
                $stomp->ack($frame);
        }
}