phanstalk的PHP进程分叉


PHP process forking with Pheanstalk

我试图创建一个PHP脚本,在后台和分叉子进程运行。(我知道这会让服务器崩溃;在这个问题的范围之外还有一些额外的保护措施)

简单地说,代码是这样工作的:

$pids = array();
$ok = true;
while ($ok)
{
    $job = $pheanstalk->watch('jobs')->ignore('default')->reserve();
    echo 'Reserved job #' . $job->getId();
    $pids[$job->getId()] = pcntl_fork();
    if(!$pids[$job->getId()]) {
       doStuff();
       $pheanstalk->delete($job);
       exit();
    }
}
问题是,一旦我分叉进程,我得到错误:
Reserved job #0
PHP Fatal error:  Uncaught exception 'Pheanstalk_Exception_ServerException' with message 'Cannot delete job 0: NOT_FOUND'

我的问题是,pheanstalk是如何返回一个没有ID和负载的作业的?它几乎感觉$pheanstalk一旦我叉子被损坏。如果我去掉分叉,一切都可以正常工作。(虽然它必须等待每个进程)

在删除phanstalk作业之前设置这个if条件:

if ($job) $pheanstalk->delete($job);

这是因为很有可能在代码到达此位置之前,文件的另一个php实例已经删除了该作业。(另一个实例仍然可以使用reserve()检索该作业,直到该作业从队列中删除。

出现此问题的原因是该作业由主进程保留。在调用pcntl_fork()之后,实际上有一个$worker变量的副本,因此主进程对作业有一个锁,当第二个作业试图删除时,它说它不存在(或者在这种情况下,它被另一个进程保留)。下面的代码通过创建一个新的worker来处理它,然后在主worker上释放作业,并尝试在第二个worker上拾取它。

# worker for the main process
$worker = new 'Pheanstalk'Pheanstalk($host, $port, $timeout, $persistent);
$pid = -1;
# seek out new jobs
while ($job = $worker->watch('queue_caller')->reserve()) {
    # make sure pcntl is installed & enabled
    if (function_exists('pcntl_fork')) {
        # create a child process
        $pid = pcntl_fork();
    }
    if ($pid === -1) {
        # code will run in single threaded mode
    } elseif ($pid !== 0) {
        # parent process
        # release the job so it can be picked up by the fork
        $worker->release($job);
        # short wait (1/20000th second) to ensure the fork executes first
        # adjust this value to what is appropriate for your environment
        usleep(50);
        # clear out zombie processes after they're completed
        pcntl_waitpid(0, $pidStatus, WNOHANG);
        # go back to looking for jobs
        continue;
    } else {
        # child worker is needed, because it has to own the job to delete it
        /** @var Pheanstalk $worker */
        $worker = new 'Pheanstalk'Pheanstalk($host, $port, $timeout, $persistent);
        # only look for jobs for 1 second, in theory it should always find something
        $job = $worker->watch('queue_caller')->reserve(1);
        if (false === $job) {
            # for some reason there is no job
            # terminate the child process with an error code
            exit(1);
        }
    }
    /** Start your code **/
    do_something();
    /** End your code **/
    # delete the job from the queue        
    $worker->delete($job);
    # only terminate if it's the child process 
    if ($pid === 0) {
        # terminate the child process with success code
        exit(0);
    }
}