我正在用PHP编写一个函数,它循环遍历一个数组,然后对它执行异步调用(使用Promise)。
问题是,我可以使这个循环发生的唯一方法是让函数异步调用自己。我很快就遇到了100个嵌套函数的问题,我基本上想把它改成不循环。
function myloop($data, $index = 0) {
if (!isset($data[$index])) {
return;
}
$currentItem = $data[$index];
$currentItem()->then(function() use ($data, $index) {
myloop($data, $index + 1);
});
}
对于那些想从实际角度回答这个问题的人(例如:重写为不异步),我正在尝试函数和异步模式,我想知道是否有可能在PHP中做到这一点。
我用伪代码写了一个可能的解决方案。这个想法是限制使用数据库同时异步运行的项数队列中。myloop()
不再是直接递归的,而是被调用当一个项目完成运行时。在样本数据中,我限制了它到4项并发(任意值)。基本上,它仍然在递归地调用自己,但以一种迂回的方式,避免您提到的许多嵌套调用的情况。
执行流程:
myloop() ---> queue
^ v
| |
'<-processor <-'
<?php
//----------
// database
//table: config
//columns: setting, value
//items: ACTIVE_COUNT, 0
// ITEM_CONCURRENT_MAX, 4
//table: queue
//columns: id, item, data, index, pid, status(waiting, running, finished), locked
// --- end pseudo-schema ---
<?php
// ---------------
// itemloop.php
// ---------------
//sends an item and associated data produced by myloop() into a database queue,
//to be processed (run asynchronous, but limited to how many can run at once)
function send_item_to_processor($item, $data, $index, $counter) {
//INSERT $item to a queue table, along with $data, $index (if needed), $counter, locked = 0
//status == waiting
}
//original code, slightly modified to remove direct recursion and implement
//the queue.
function myloop($data, $index = 0, $counter = 0) {
if (!isset($data[$index])) {
return;
}
$currentItem = $data[$index];
$currentItem()->then(function() use ($data, $index) {
//instead of directly calling `myloop()`, push item to
//database and let the processor worry about it. see below.
//*if you wanted currentItem to call a specific function after finishing,
//you could create an array of numbered functions and pass the function
//number along with the other data.*
send_item_to_processor($currentItem, $data, $index + 1, $counter + 1);
});
}
// ---------------
// processor.php
// ---------------
//handles the actual running of items. looks for a "waiting" item and
//executes it, updating various statuses along the way.
//*called from `process_queue()`*
function process_new_items() {
//select ACTIVE_COUNT, ITEM_CONCURRENT_MAX
//ITEM_COUNT = total records in the queue. this is done to
//short-circuit the execution of `process_queue()` whenever possible
//(which is called frequently).
if (ITEM_COUNT == 0 || $ACTIVE_COUNT >= $ITEM_CONCURRENT_MAX)
return FALSE;
//select item from queue where status = waiting AND locked = 0 limit 1;
//update item set status = running, pid = programPID
//update config ACTIVE_COUNT = +1
//**** asynchronous run item here ****//
return TRUE;
}
//main processor for the queue. first processes new/waiting items
//if it can (if too many items aren't already running), then processes
//dead/completed items. Upon an item.status == finished, `myloop()` is
//called from this function. Still technically a recursive call, but
//avoids out-of-control situations due to the asynchronous nature.
//this function could be called on a timer of some sort, such as a cronjob
function process_queue() {
if (!process_new_items())
return FALSE; //too many instances running, no need to process
//check queue table for items with status == finished or is_pid_valid(pid) == FALSE
$numComplete = count($rows);
//update all rows to locked = 1, in case process_queue() gets called again before
//we finish, resulting in an item potentially being processed as dead twice.
foreach($rows as $item) {
if (is_invalid(pid) || $status == finished) {
//and here is the call back to myloop(), avoiding a strictly recursive
//function call.
//*Not sure what to do with `$item` here -- might be passed back to `myloop()`?.*
//delete item(s) from queue
myloop(data, index, counter - 1);
//decrease config.ACTIVE_COUNT by $numComplete
}
}
}