从cron运行PHP pthreads失败


PHP pthreads failing when run from cron

好吧,让我们慢慢来…

我有一个pthreads脚本正在为我运行和工作,当我通过ssh从命令行手动运行它时,测试和工作100%的时间。脚本如下,调整主线程进程代码以模拟随机进程的运行时间。

class ProcessingPool extends Worker {
    public function run(){}
}
class LongRunningProcess extends Threaded implements Collectable {
    public function __construct($id,$data) {
        $this->id = $id;
        $this->data = $data;
    }
    public function run() {
        $data = $this->data;
        $this->garbage = true;
        $this->result = 'START TIME:'.time().PHP_EOL;
        // Here is our actual logic which will be handled within a single thread (obviously simulated here instead of the real functionality)
        sleep(rand(1,100));
        $this->result .= 'ID:'.$this->id.' RESULT: '.print_r($this->data,true).PHP_EOL;
        $this->result .= 'END TIME:'.time().PHP_EOL;
        $this->finished = time();
    }
    public function __destruct () {
        $Finished = 'EXITED WITHOUT FINISHING';
        if($this->finished > 0) {
            $Finished = 'FINISHED';
        }
        if ($this->id === null) {
            print_r("nullified thread $Finished!");
        } else {
            print_r("Thread w/ ID {$this->id} $Finished!");
        }
    }
    public function isGarbage() : bool { return $this->garbage; }
    public function getData() {
        return $this->data;
    }
    public function getResult() {
        return $this->result;
    }
    protected $id;
    protected $data;
    protected $result;
    private $garbage = false;
    private $finished = 0;
}
$LoopDelay = 500000; // microseconds
$MinimumRunTime = 300; // seconds (5 minutes)
// So we setup our pthreads pool which will hold our collection of threads
$pool = new Pool(4, ProcessingPool::class, []);
$Count = 0;
$StillCollecting = true;
$CountCollection = 0;
do {
    // Grab all items from the conversion_queue which have not been processed
    $result = $DB->prepare("SELECT * FROM `processing_queue` WHERE `processed` = 0 ORDER BY `queue_id` ASC");
    $result->execute();
    $rows = $result->fetchAll(PDO::FETCH_ASSOC);
    if(!empty($rows)) {
        // for each of the rows returned from the queue, and allow the workers to run and return
        foreach($rows as $id => $row) {
            $update = $DB->prepare("UPDATE `processing_queue` SET `processed` = 1 WHERE `queue_id` = ?");
            $update->execute([$row['queue_id']]);
            $pool->submit(new LongRunningProcess($row['fqueue_id'],$row));
            $Count++;
        }
    } else {
        // 0 Rows To Add To Pool From The Queue, Do Nothing...
    }

    // Before we allow the loop to move on to the next part, lets try and collect anything that finished
    $pool->collect(function ($Processed) use(&$CountCollection) {
        global $DB;
        $data = $Processed->getData();
        $result = $Processed->getResult();

        $update = $DB->prepare("UPDATE `processing_queue` SET `processed` = 2 WHERE `queue_id` = ?");
        $update->execute([$data['queue_id']]);
        $CountCollection++;
        return $Processed->isGarbage();
    });
    print_r('Collecting Loop...'.$CountCollection.'/'.$Count);

    // If we have collected the same total amount as we have processed then we can consider ourselves done collecting everything that has been added to the database during the time this script started and was running
    if($CountCollection == $Count) {
        $StillCollecting = false;
        print_r('Done Collecting Everything...');
    }
    // If we have not reached the full MinimumRunTime that this cron should run for, then lets continue to loop
    $EndTime = microtime(true);
    $TimeElapsed = ($EndTime - $StartTime);
    if(($TimeElapsed/($LoopDelay/1000000)) < ($MinimumRunTime/($LoopDelay/1000000))) {
        $StillCollecting = true;
        print_r('Ended To Early, Lets Force Another Loop...');
    }
    usleep($LoopDelay);
} while($StillCollecting);
$pool->shutdown();

因此,虽然上述脚本将通过命令行运行(已调整为基本示例,并在上面的示例中模拟了详细的处理代码),下面的命令在每5分钟从cron设置运行时给出不同的结果…

/opt/php7zts/bin/php -q /home/account/cron-entry.php file=every-5-minutes/processing-queue.php

上面的脚本,当使用上面的命令行调用时,将在脚本运行期间反复循环并从DB队列中收集任何新项目,并将它们插入池中,这允许4个进程同时运行并完成,然后在另一个循环发生之前收集并更新队列,从DB中提取任何新项目。该脚本将一直运行,直到我们在脚本执行期间处理并收集队列中的所有进程为止。如果脚本没有运行5分钟的预期时间,循环将被迫继续检查队列,如果脚本运行超过5分钟,它允许任何当前线程完成&结案前收集。请注意,上面的代码还包括一个基于代码的"羊群"功能,该功能使该空闲循环的未来执行,并在锁解除后退出或启动,以确保队列和线程不会相互碰撞。同样,所有这些都可以通过SSH在命令行中工作。

一旦我采用上述命令,并将其放入cron中每5分钟运行一次,本质上给了我一个永无止境的循环,同时维护内存,我得到了不同的结果…

结果描述如下…脚本启动,检查群集,如果锁不存在,则继续执行,创建锁,并运行上述脚本。项目从DB中的队列中取出,并插入到池中,池按预期一次触发4个线程。但是意外的结果是run()命令似乎没有被执行,取而代之的是__destruct函数运行,并返回一个"Thread w/ID 2 FINISHED!"类型的消息到输出。这反过来意味着收集端不收集任何东西,并且初始化脚本(cron脚本本身/home/account/cron-entry.php file=every-5-minutes/process-queue .php)在所有内容都放入池并销毁后完成。这会过早地"完成"cron作业,因为没有其他事情可做,只能循环并从队列中取出任何新内容,因为当队列中的processed == 1时,它们被认为"正在处理"。

问题最终变成了……我如何使cron的脚本意识到线程,在哪里产生和运行()他们没有关闭池之前,他们可以做任何事情?

(注意……如果您复制/粘贴提供的脚本,请注意,我没有在删除详细的逻辑后测试它,所以它可能需要一些简单的修复…请不要挑剔这些代码,因为这里的关键是,如果脚本从命令行执行,pthreads可以工作,但是当脚本从CRON执行时,pthreads不能正常运行。如果你打算评论非建设性的批评,请用你的手指做其他事情!)

乔·沃特金斯!我需要你的才华!提前感谢!

在所有这些之后,问题似乎与用户权限有关。我在cpanel中设置了这个特定的cron,当手动运行命令时,我以root身份登录。

在root crontab中设置这个命令后,我就能够让它成功地运行池中的线程了。唯一的问题是我现在有一些线程永远不会完成,有时我无法关闭池。但这是另一个问题,所以我将在别处提出另一个问题。

对于遇到这个问题的人,请确保您知道cron的所有者是谁,因为它与php的pthreads有关。