#!/usr/bin/env php < ?php set_time_limit(0); define('DEFUNCT_PATH', '/dev/shm/defunct.pids'); define('QUEUE', '/queue/callback'); try{ $stomp = new Stomp('tcp://192.168.100.100:61613'); }catch(StompException $e){ exit($e->getMessage()); } file_exists(DEFUNCT_PATH) && unlink(DEFUNCT_PATH); touch(DEFUNCT_PATH); $stomp->subscribe(QUEUE); while(true){ if( mt_rand(1,10) > 8 ){ $defunctPids = []; $handle = fopen(DEFUNCT_PATH, 'r+'); flock($handle, LOCK_EX); while($pid = fgets($handle)){ $defunctPids[] = $pid; } ftruncate($handle, 0); fclose($handle); foreach($defunctPids as $pid){ pcntl_waitpid($pid, $status); } unset($defunctPids); } $frame = $stomp->readFrame(); if( empty($frame) ) continue; $pid = pcntl_fork(); if($pid === -1){ exit('pcntl_fork error'); }else if($pid === 0){ echo date('Y-m-d H:i:s'),' ',$frame->body,"\r\n"; file_put_contents(DEFUNCT_PATH, posix_getpid()."\r\n", FILE_APPEND | LOCK_EX); posix_kill(posix_getpid(), SIGKILL); }else{ $stomp->ack($frame); } }