====== PHP True Async ======
* Version: 1.2
* Date: 2025-04-16
* Author: Edmond [HT], edmondifthen@proton.me
* Status: Under discussion
* First Published at: http://wiki.php.net/rfc/true_async
* Git: https://github.com/true-async
===== Introduction =====
For several years, **PHP** has been attempting to carve out a niche in the development of long-running applications, where concurrent code execution becomes particularly useful. Production-ready solutions such as **Swoole**, **AMPHP**, **ReactPHP**, and others have emerged.
However, **PHP** still does not provide a comprehensive implementation for writing concurrent code. PHP extensions have no way to support //non-blocking execution//, even if they are capable of doing so. **Swoole** is forced to copy thousands of lines of code just for a few modifications, while **AMPHP** developers have to build drivers for ''MySQL'', ''PostgreSQL'', ''Redis'', and other systems from scratch in user-land.
The goal of this **RFC** is to establish a standard for writing concurrent code in PHP, as well as a C-API interface that would allow PHP to be extended at a low level using C, Rust, C++, and other languages. This would enable extensions to support **non-blocking I/O** without the need to override PHP functions or duplicate code.
===== Goals =====
The **True Async** project pursues the following goals and values:
* From a PHP developer's perspective, the **main value** of this implementation is that they DO NOT NEED to change existing code (or if changes are required, they should be minimal) to enable concurrency. Unlike explicit async models, this approach lets developers reuse existing synchronous code inside fibers without modification.
* Code that was originally written and intended to run outside of a Coroutine must work **EXACTLY THE SAME** inside a Coroutine without modifications.
* A PHP developer should not have to think about how Coroutine switch and should not need to manage their switchingβexcept in special cases where they consciously choose to intervene in this logic.
* If there is existing code or a familiar style, such as AMPHP interfaces, Go coroutines, Swoole API, and others, it is best to use what is most recognizable to a broad range of developers.
* The goal is to find a balance between flexibility and simplicity. On one hand, the implementation should allow leveraging existing solutions without requiring external libraries. On the other hand, it should avoid unnecessary complexity. Many design choices in this implementation are driven by the desire to free developers from concerns about compatibility with "external libraries" in favor of a standardized approach.
===== Proposal =====
==== Limitations ====
This **RFC** does not include modifications to PHP built-in functions or PHP extension functions!
Code examples that involve PHP built-in stream functions, such as ''file_get_contents'',
as well as ''sleep'', ''usleep'', and PHP Socket functions are provided
for demonstration purposes only to better illustrate the content of the **RFC**.
==== π Diagrams Overview ====
This **RFC** is quite complex due to the number of logical connections.
Please use the diagrams from the table to simplify understanding.
| Diagram Name | Description |
| [[https://github.com/EdmondDantes/php-true-async-rfc/blob/main/diagrams/mind-map.svg]] | A mind map showing the relationship between key features and problems solved by the **RFC**. |
| [[https://github.com/EdmondDantes/php-true-async-rfc/blob/main/diagrams/feature-to-requirement.svg]] | Maps features to the requirements that generated them. |
| https://github.com/EdmondDantes/php-true-async-rfc/blob/main/diagrams/decision-tree.svg | A decision tree that guides developers on which API to use depending on the situation. |
==== Overview ====
=== Short glossary ===
| Term | Description | Section |
| **Coroutine** | An executable unit of code that can be suspended and resumed | Launching any function in non-blocking mode |
| **Scope** | A container for managing the lifecycle of coroutines | Scope |
| **CoroutineGroup** | A container for managing a group of tasks with the ability to retrieve results | CoroutineGroup |
| **Zombie coroutine** | A coroutine that continues execution after its Scope has been destroyed | Scope disposal |
| **Context** | A data storage associated with a coroutine or Scope | Context API |
| **CancellationException** | A mechanism for cooperative canceling coroutine execution | Cancellation |
This **RFC** describes the **API** and **new syntax** for writing concurrent code in PHP, which includes:
=== Coroutine ===
A lightweight execution thread that can be suspended (''suspend'') and resumed.
Example:
spawn(function() {
echo "Start";
suspend(); // Suspend the coroutine
echo "Resumed";
});
=== Scope ===
A container that manages coroutine lifetimes.
Example:
$scope = new Async\Scope();
$scope->spawn(function() {
// Coroutine bound to $scope
spawn(function() {
// Coroutine bound to $scope
});
});
// Dispose of the scope after 5 seconds
sleep(5);
$scope->disposeSafely();
=== CoroutineGroup ===
Explicit group of coroutines with centralized result/error handling.
Example:
$coroutineGroup = new Async\CoroutineGroup(captureResults: true);
$coroutineGroup->spawn(task1(...));
$coroutineGroup->spawn(task2(...));
[$result1, $result2] = await($coroutineGroup);
=== Cooperative cancellation ===
A special exception that implements cooperative cancellation:
Example:
$coroutine = spawn(function() {
try {
Async\delay(1000);
} catch (Async\CancellationException $e) {
echo "Coroutine cancelled";
}
});
suspend();
$coroutine->cancel();
=== Context ===
Coroutine/Scope-associated data storage.
Example:
currentContext()->set('user_id', 123);
spawn(function() {
$userId = currentContext()->get("user_id");
echo "User ID: $userId"; // 123
});
=== Combinators ===
Functions that combine multiple awaitable objects: ''awaitAny()'', ''awaitAll()'', ''awaitFirstSuccess()''.
Example:
$results = awaitAll([spawn(task1(...)), spawn(task2(...))]);
=== Waiting for coroutine results ===
function fetchData(string $file): string
{
$result = file_get_contents($file);
if($result === false) {
throw new Exception("Error reading $file");
}
return $result;
}
echo await(spawn(fetchData(...), "file.txt"));
=== Awaiting a result with cancellation ===
echo await(spawn('fetchData', "https://php.net/"), Async\timeout(2000));
echo await(spawn('fetchData', "https://php.net/"), spawn('sleep', 2));
=== Suspend ===
Transferring control from the coroutine to the ''Scheduler'':
function myFunction(): void {
echo "Hello, World!\n";
Async\suspend();
echo "Goodbye, World!\n";
}
Async\spawn(myFunction(...));
echo "Next line\n";
Output:
Hello, World
Next line
Goodbye, World
=== Working with a group of concurrent tasks. ===
function mergeFiles(string ...$files): string
{
$coroutineGroup = new Async\CoroutineGroup(captureResults: true);
foreach ($files as $file) {
$coroutineGroup->spawn(file_get_contents(...), $file);
}
return implode("\n", await($coroutineGroup));
}
=== Structured concurrency ===
use Async\Scope;
use Async\CoroutineGroup;
use Async\AwaitCancelledException;
/**
* Retrieves user profile data from different sources
*/
function getUserProfile(int $userId): array
{
// Main task group with result capturing
$profileTasks = new CoroutineGroup(captureResults: true);
try {
// Start fetching basic user information
$profileTasks->spawn(function() use ($userId):array {
$userData = await(spawn('fetchUserData', $userId));
return ['basic' => $userData];
});
// Start fetching extended details in parallel
$profileTasks->spawn(function() use ($userId): array {
// Subtask group for supplementary information
$orderTasks = new CoroutineGroup(Scope::inherit(), captureResults: true);
// Request user orders with 2-second timeout
$orderTasks->spawn(function() use ($userId): array {
try {
return ['orders' => await(spawn('fetchUserOrders', $userId), Async\timeout(2000)]);
} catch (AwaitCancelledException) {
return ['orders' => ['status' => 'timeout']];
}
});
// Request API2 user orders with 2-second timeout
$orderTasks->spawn(function() use ($userId): array {
try {
return ['orders2' => await(spawn('fetchUserOrdersAPI2', $userId), Async\timeout(2000)]);
} catch (AwaitCancelledException) {
return ['orders2' => ['status' => 'timeout']];
}
});
// Wait for all order tasks to complete
return array_merge(...await($orderTasks));
});
// Request user settings
$profileTasks->spawn(function() use ($userId): array {
return ['settings' => await(spawn('fetchUserSettings', $userId))];
});
// Merge all results into a single profile
return array_merge(...await($profileTasks));
} catch (Exception $e) {
// Error handling
error_log("Error fetching user profile: " . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
// Usage
$profile = await(spawn(getUserProfile(...), 123));
var_dump($profile);
=== Await all child tasks. ===
function processBackgroundJobs(string ...$jobs): array
{
$scope = new Scope();
foreach ($jobs as $job) {
$scope->spawn(processJob(...), $job);
}
// Waiting for all child tasks throughout the entire depth of the hierarchy.
$scope->awaitCompletion(Async\timeout(300 * 1000));
}
function processJob(mixed $job): void {
$scope = \Async\Scope::inherit();
$scope->spawn(task1(...), $job);
$scope->spawn(task2(...), $job);
// Waiting for all child tasks in the current scope.
$scope->awaitCompletion(Async\timeout(300 * 1000));
}
=== Binding Coroutines to a PHP Object ===
class HttpClient
{
private Scope $scope;
public function __construct()
{
$this->scope = new Scope();
}
public function request(array $data): \Async\Awaitable
{
return $this->scope->spawn(function() use ($data) {
// This coroutine is bound to the MyClass instance
});
}
}
$service = new HttpClient;
$service->request(['login' => 'admin', 'password' => '1234']);
// HttpClient instance will stop all coroutines bound to it.
unset($service);
=== Tasks race ===
use Async\CoroutineGroup;
function fetchFirstSuccessful(string ...$apiHosts): string
{
$coroutineGroup = new Async\CoroutineGroup(captureResults: false);
foreach ($apiHosts as $host) {
$coroutineGroup->spawn(file_get_contents(...), $host);
}
// Get the first successful result
return await($coroutineGroup->race(ignoreErrors: true));
}
==== Implementation requirements ====
The implementation of this **RFC** should be carried out in a way that minimizes changes to the **PHP core**.
The proposed changes include:
* syntax modifications to the language,
* interfaces without implementations for ''Reactor'' and ''Scheduler'',
* an internal interface for the ''Scope'' class.
All other classes and functions from this **RFC** will be moved to a separate module,
which will become part of the standard library.
However, this module can be replaced with a different one if necessary.
The ''Coroutine'', ''Scope'' and ''CoroutineGroup'' classes are not part of the **PHP** core.
The behavior of functions such as ''spawn()'', ''await()'', and ''suspend()'' is not defined in the core,
and can be overridden by an extension, which must adhere to the logic defined in this **RFC**.
==== Scheduler and Reactor ====
**Scheduler** and **Reactor** must be implemented as **PHP** extensions that implement low-level interfaces.
The **Scheduler** and **Reactor** interfaces are part of the implementation of this **RFC**.
The behavior of **Scheduler** and **Reactor** must not contradict the logic of the **RFC**.
Components cannot override the logic of expressions such as spawn, async, suspend, and so on.
However, this **RFC** does not impose any restrictions on extending functionality.
It is allowed to use the **Async** namespace for new functions or objects in **Scheduler** and **Reactor**.
> β οΈ **Warning:** Users should not make assumptions about the execution order of coroutines unless
> this is a specific goal of a particular **Scheduler** implementation.
The **Reactor** is a component that implements the **Event Loop**.
It may be exposed as a separate API in **PHP-land**,
but its behavior is not defined within this **RFC**.
==== Preemptive Multitasking ====
**PHP** allows for the implementation of forced coroutine suspension,
which can be used in a preemptive multitasking algorithm.
This capability is particularly implemented in **Swoole**.
However, the current **RFC** rejects **preemptive multitasking** due to the unpredictable behavior
of code during context switches.
A coroutine can lose control literally at any PHP opcode,
which can significantly affect the outcome and contradict the programmer's expectations.
Writing code that can lose control at any moment is a complex domain where PHP does not seem like an adequate tool.
This **RFC** considers a scenario where a coroutine is abruptly stopped only in one case:
if the **Scheduler** implements a runtime control mechanism similar to ''max_execution_time''.
==== Cancellable by design ====
This **RFC** is based on the principle of **"Cancellable by design"**, which can be described as follows:
> By default, coroutines **should be** designed in such a way that their
> cancellation at any moment does not compromise data integrity.
> Coroutines launched without a defined ''Scope'' or lifetime **must** adhere to the "Cancellable by design" principle.
> If a coroutineβs lifetime needs to be controlled β it **MUST** be done **EXPLICITLY**!
In practice, this means that if a coroutine is created using the expression ''spawn '',
the developer treats it as non-critical in terms of data integrity.
If the developer needs to manage the coroutineβs lifetime, they will use the expression ''spawn with''.
In other words, the developer must take extra steps to explicitly extend the coroutine's lifetime.
==== Namespace ====
All functions, classes, and constants defined in this **RFC** are located in the ''Async'' namespace.
Extensions for **Scheduler/Reactor** are allowed to extend this namespace with functions and classes,
provided that they are directly related to concurrency functionality.
==== Coroutine ====
> A ''Coroutine'' is an ''execution container'', transparent to the code,
> that can be suspended on demand and resumed at any time.
Isolated execution contexts make it possible to switch between coroutines and execute tasks concurrently.
Any function can be executed as a coroutine without any changes to the code.
A coroutine can stop itself bypassing control to the ''Scheduler''.
However, it cannot be stopped externally.
> β οΈ **Warning:**
> It is permissible to stop a coroutineβs execution externally for two reasons:
> * To implement multitasking.
> * To enforce an active execution time limit.
A suspended coroutine can be resumed at any time.
The ''Scheduler'' component is responsible for the coroutine resumption algorithm.
A coroutine can be resumed with an **exception**, in which case an exception
will be thrown from the suspension point.
=== Coroutine Lifecycle ===
{{ :rfc:true_async:coroutine-lifecycle.svg |}}
This state diagram illustrates the lifecycle of a coroutine, showing how it transitions through various states during its execution:
**States:**
- **Created** β The coroutine has been defined but not yet started.
- **Queued** β The coroutine is queued
- **Running** β The coroutine is actively executing.
- **Suspended** β Execution is paused, usually waiting for a result or I/O.
- **Completed** β The coroutine has finished successfully (via ''return'').
- **Pending Cancellation** β A cancellation was requested; the coroutine is cleaning up.
**Key Transitions:**
- ''spawn'' moves a coroutine from **Created** to **Running**.
- ''suspend'' and ''resume'' move it between **Running** and **Suspended**.
- ''return/exit'' ends it in **Completed**.
- ''cancel()'' initiates cancellation from **Running** or **Suspended**, leading to **Pending Cancellation**, and finally **Cancelled**.
=== ''Coroutine'' state check methods ===
| Method | Description | Related State on Diagram |
| ''isStarted(): bool'' | Returns ''true'' if the coroutine has been started. | ''Running'', ''Suspended'', etc. |
| ''isRunning(): bool'' | Returns ''true'' if the coroutine is currently running. | ''Running'' |
| ''isQueued(): bool'' | Returns ''true'' if the coroutine is queued. | ''Queued'' |
| ''isSuspended(): bool'' | Returns ''true'' if the coroutine is suspended. | ''Suspended'' |
| ''isCancelled(): bool'' | Returns ''true'' if the coroutine has been cancelled. | ''Cancelled'' |
| ''isCancellationRequested(): bool'' | Returns ''true'' if cancellation has been requested. | ''Pending Cancellation'' |
| ''isFinished(): bool'' | Returns ''true'' if the coroutine has completed execution. | ''Completed'', ''Cancelled'' |
==== Spawn function ====
To create coroutines, the ''spawn(callable $callable, mixed ...$args)'' function is used.
It launches the '''' in a separate execution context and returns
an instance of the ''Async\Coroutine'' class as a result.
Let's look at two examples:
> **Note:** The examples below are for demonstration purposes only.
> The non-blocking version of the ''file_get_contents'' function is not part of this **RFC**.
$result = file_get_contents('https://php.net');
echo "next line".__LINE__."\n";
This code:
1. first returns the contents of the PHP website,
2. then executes the ''echo'' statement.
$coroutine = spawn('file_get_contents', 'https://php.net');
echo "next line".__LINE__."\n";
This code:
1. starts a coroutine with the ''file_get_contents'' function.
2. The next line is executed without waiting for the result of ''file_get_contents''.
3. The coroutine is executed after the ''echo'' statement.
=== Spawn with scope ===
''spawnWith(Async\ScopeProvider $scope, callable $callable, mixed ...$args)'' allows launching a coroutine
with a specific ''Scope''.
The parameter can be either an ''Async\Scope'' object or a class
that implements the ''Async\ScopeProvider'' interface.
For example, such a class is ''CoroutineGroup''.
> Note: The non-blocking version of the ''gethostbyname'' function is not part
> of this RFC and is provided for demonstration purposes only.
$scope = new Async\Scope();
$coroutineGroup = new Async\CoroutineGroup($scope);
$coroutine = spawnWith($scope, function() use ():string {
return gethostbyname('php.net');
});
$coroutine = spawnWith($coroutineGroup, function() use ():string {
return gethostbyname('php.net');
});
function defineTargetIpV4(string $host): string {
return gethostbyname($host);
}
spawnWith($scope, defineTargetIpV4(...), $host);
spawnWith($coroutineGroup, defineTargetIpV4(...), $host);
You can also use the ''spawn'' method, which is available for the ''Scope'' and ''CoroutineGroup'' classes:
$scope = new Async\Scope();
$coroutineGroup = new Async\CoroutineGroup($scope);
$coroutine = $scope->spawn(function() use ():string {
return gethostbyname('php.net');
});
$coroutine = $coroutineGroup->spawn(function() use ():string {
return gethostbyname('php.net');
});
function defineTargetIpV4(string $host): string {
return gethostbyname($host);
}
$coroutine = $scope->spawn(defineTargetIpV4(...), $host);
$coroutine = $coroutineGroup->spawn(defineTargetIpV4(...), $host);
=== ''ScopeProvider'' Interface ===
The ''ScopeProvider'' interface allows objects to provide an ''Async\Scope'' instance that can be used
in a ''spawnWith'' function.
This is useful when you want to abstract the scope management logic,
letting higher-level structures (like a task group or a custom container)
expose a scope without directly exposing internal details.
declare(strict_types=1);
namespace Async;
interface ScopeProvider
{
/**
* Returns the associated Scope instance.
*
* This scope will be used when spawning a coroutine via ''spawn with $provider''.
*
* @return Scope|null
*/
public function provideScope(): ?Scope;
}
> The ''provideScope'' method may return ''NULL''; in this case, the current **Scope** will be used.
**Example Use Case:**
A task group can implement this interface to automatically provide its internal scope to ''spawn with'':
class CustomCoroutineGroup implements ScopeProvider
{
private Scope $scope;
public function __construct()
{
$this->scope = new Scope();
}
public function provideScope(): ?Scope
{
return $this->scope;
}
}
This allows you to spawn coroutines into the task group using:
$coroutineGroup->spawn(function() {
// This coroutine is bound to the CoroutineGroup's scope
});
=== ''SpawnStrategy'' Interface ===
The ''SpawnStrategy'' interface allows attaching a newly spawned coroutine
to a custom user-defined context immediately after the ''spawn with'' expression is evaluated.
This is useful for scenarios where the coroutine should be registered, tracked,
or logically grouped within a context (e.g., a ''CoroutineGroup'' or a custom task manager).
interface SpawnStrategy extends ScopeProvider
{
/**
* Called before a coroutine is spawned, before it is enqueued.
*
* @param Coroutine $coroutine The coroutine to be spawned.
* @param Scope $scope The Scope instance.
*
*/
public function beforeCoroutineEnqueue(Coroutine $coroutine, Scope $scope): array;
/**
* Called after a coroutine is spawned, enqueued.
*
* @param Coroutine $coroutine
* @param Scope $scope
*/
public function afterCoroutineEnqueue(Coroutine $coroutine, Scope $scope): void;
}
If the ''$scope'' object in a ''spawn with'' expression implements the ''SpawnStrategy'' interface,
then the ''acceptCoroutine'' method will be called immediately after the coroutine is created.
**Example:**
A class like ''CustomCoroutineGroup'' might implement this interface
to automatically collect all spawned coroutines under its management:
class CustomCoroutineGroup implements Async\SpawnStrategy
{
private array $coroutines = [];
public function afterCoroutineEnqueue(Coroutine $coroutine, Scope $scope): void
{
$this->coroutines[] = $coroutine;
echo "Coroutine added to the group as ".$coroutine->getSpawnLocation()."\n";
}
// Additional methods for managing the group...
}
$customCoroutineGroup = new CustomCoroutineGroup();
$customCoroutineGroup->spawn(function() {
// This coroutine will be automatically added to the custom task group
});
The ''beforeCoroutineEnqueue()'' method is called after the coroutine has been created,
but before it is added to the queue.
It allows for additional operations to be performed with the coroutine and its context,
and it returns an optional list of options for the ''Scheduler''.
> The list of options for the Scheduler is not part of this **RFC**
> and is defined by the ''Scheduler'' implementation.
class HiPriorityStrategy implements Async\SpawnStrategy
{
public function beforeCoroutineEnqueue(Coroutine $coroutine, Scope $scope): array
{
// Mark the coroutine as high priority before it is enqueued
$coroutine->asHiPriority();
return [];
}
// Additional methods ...
}
spawnWith(new HiPriorityStrategy(), function() {
// This coroutine will be marked as high priority
});
=== ''hiPriority'' strategy ===
The ''Async\hiPriority(?Scope $scope = null)'' function allows launching a coroutine with high priority:
use function Async\hiPriority;
spawn(function() {
echo "normal priority\n";
});
spawnWith(hiPriority(), function {
echo "high priority\n";
});
**Expected output:**
high priority
normal priority
If the ''$scope'' parameter is not specified, the current ''Scope'' will be used for launching.
The ''hiPriority'' strategy marks the ''coroutine'' as high-priority using the ''Coroutine::asHiPriority()'' method.
This action serves as a recommendation for the ''Scheduler'', suggesting that the coroutine should be placed as
close to the front of the queue as possible. However, the programmer **MUST NOT** rely on this outcome.
''hiPriority'' can be useful in situations where resources need to be released as quickly as possible
or when a critical section of code must be executed promptly. The programmer should not overuse it,
as this may negatively affect the application's performance.
==== Suspension ====
A coroutine can suspend itself at any time using the ''suspend'' keyword:
function example(string $name): void {
echo "Hello, $name!";
Async\suspend();
echo "Goodbye, $name!";
}
spawn('example', 'World');
spawn('example', 'Universe');
Expected output:
Hello, World!
Hello, Universe!
Goodbye, World!
Goodbye, Universe!
**Basic usage:**
Async\suspend();
The ''suspend'' can be used in any function including from the **main execution flow**:
function example(string $name): void {
echo "Hello, $name!";
Async\suspend();
echo "Goodbye, $name!";
}
$coroutine = spawn(example(...), 'World');
// suspend the main flow
Async\suspend();
echo "Back to the main flow";
Expected output:
Hello, World!
Back to the main flow
Goodbye, World!
The ''suspend'' keyword can be a throw point if someone resumes the coroutine externally with an exception.
function example(string $name): void {
echo "Hello, $name!";
try {
suspend();
} catch (Exception $e) {
echo "Caught exception: ", $e->getMessage();
}
echo "Goodbye, $name!";
}
$coroutine = spawn('example', 'World');
// pass control to the coroutine
suspend();
$coroutine->cancel();
**Expected output:**
Hello, World!
Caught exception: cancelled at ...
Goodbye, World!
==== Awaitable interface ====
The ''Awaitable'' interface is a contract that allows objects to be used in the ''await'' expression.
The interface does not have any methods on the user-land side
and is intended for objects implemented as **PHP extensions**, such as:
- ''Future''
- ''Cancellation''
> **Note:** These classes are not part of this **RFC**.
The following classes from this **RFC** also implement this interface:
- ''Coroutine''
- ''CoroutineGroup''
Unlike ''Future'', the ''Awaitable'' interface does not impose limitations on the number of state changes,
which is why the ''Future'' contract is considered a special case of the ''Awaitable'' contract.
In the general case, objects implementing the ''Awaitable'' interface can act as triggers β that is,
they can change their state an unlimited number of times.
This means that multiple calls to ''await '' may produce different results.
In contrast, ''Coroutine'', ''Future'' and ''Cancellation'' objects change their state only once,
so using them multiple times in an ''await'' expression will always yield the same result.
**Comparison of Different Awaitable Classes:**
| | Coroutine | CoroutineGroup |
| Supports multiple state changes | No | Yes |
| Multiple await returns same result | Yes | No |
| Can capture result | Yes | Yes |
| Can capture exception | Yes | Yes |
==== Await ====
Async\await(Async\Awaitable $awaitable, ?Async\Awaitable $cancellation = null): mixed
The ''await'' function is used to wait for the completion of another coroutine
or any object that implements the ''Awaitable'' interface.:
function readFile(string $fileName):string
{
$result = file_get_contents($fileName);
if($result === false) {
throw new Exception("Error reading file1.txt");
}
return $result;
}
$coroutine = spawn(readFile(...), 'file1.txt');
echo await($coroutine);
// or
echo await spawn(readFile(...), 'file2.txt');
''await'' suspends the execution of the current coroutine until
the awaited one returns a final result or completes with an exception.
function testException(): void {
throw new Exception("Error");
}
try {
await(spawn(testException(...)));
} catch (Exception $e) {
echo "Caught exception: ", $e->getMessage();
}
=== Await with cancellation ===
The ''await'' function can accept a second argument ''$cancellation'', which is an ''Awaitable'' object.
This object can be a ''Coroutine'', or another object that implements the ''Awaitable'' interface.
The $cancellation argument limits the waiting time for the first argument.
As soon as the $cancellation is triggered, execution is interrupted with an exception ''AwaitCancelledException''.
function readFile(string $fileName):string
{
$result = file_get_contents($fileName);
if($result === false) {
throw new Exception("Error reading file1.txt");
}
return $result;
}
try {
// Wait for the coroutine to finish or for the cancellation to occur
echo await(spawn(readFile(...), 'file1.txt'), Async\timeout(2000));
} catch (AwaitCancelledException $e) {
echo "Caught exception: ", $e->getMessage();
}
==== Edge Behavior ====
The use of ''spawn''/''await''/''suspend'' is allowed in almost any part of a PHP program.
This is possible because the PHP script entry point forms the **main execution thread**,
which is also considered a coroutine.
As a result, operations like ''suspend'' and ''currentCoroutine()'' will behave the same way as in other cases.
If only **one coroutine** exists in the system, calling ''suspend'' will immediately return control.
The ''register_shutdown_function'' handler operates in synchronous mode,
after asynchronous handlers have already been destroyed.
Therefore, the ''register_shutdown_function'' code should not use the concurrency API.
The ''suspend'' keyword will have no effect, and the ''spawn'' operation will not be executed at all.
==== Coroutine Scope ====
> **Coroutine Scope** β the space associated with coroutines created using the ''spawn'' expression.
=== Motivation ===
Sometimes it is necessary to gain control not only over a currently running coroutine,
but also over all coroutines that will be launched within a new one β without having direct access to them.
This could be the case for web server code that handles requests in separate coroutines
and does not know how many additional coroutines will be launched,
or a JobExecutor that wants to manage the lifecycle of running jobs.
Without such control, the application code loses the ability to resist runtime errors,
which increases the risk of a complete service failure.
This is why the **Coroutine Scope** pattern is of critical importance in the context of ensuring reliability.
The main use cases for ''Scope'' are:
1. Controlling the lifetime of coroutines created within a single scope (**Point of responsibility**)
2. Handling errors from all coroutines within the scope
3. Binding the lifetime of the scope's coroutines to the lifetime of a **PHP object**
4. Creating a hierarchy of scopes to manage coroutines in a structured way
Binding Scope to objects is a good practice that has proven effective in **Kotlin**.
By allowing coroutines to be tied to an object (this could be a ''Screen'' or a ''ViewModel''),
it is possible to avoid the error where coroutines outlive the object that manages them.
For frameworks, it can be useful to be able to control all coroutines created within a ''Scope'',
to apply context-dependent constraints to them.
=== Scope propagation ===
By default, all coroutines are associated with the **Global Coroutine Scope**:
spawn('file_get_contents', 'file1.txt'); // <- global scope
function readFile(string $file): void {
return file_get_contents($file); // <- global scope
}
function mainTask(): void { // <- global scope
spawn('readFile', 'file1.txt'); // <- global scope
}
spawn('mainTask'); // <- global scope
If an application never creates **custom Scopes**, its behavior is similar to coroutines in ''Go'':
* Coroutines are not explicitly linked to each other.
* The lifetime of coroutines is not limited.
The function ''spawnWith'' creates a **new coroutine** bound to the specified scope.
Scope is propagated between coroutines.
If a coroutine is launched within a specific Scope, that Scope is considered the current one.
The function like ''spawn()'' will create a coroutine within the current Scope.
Coroutines created during the execution of this **new coroutine** will become **sibling tasks**:
use Async\Scope;
$scope = new Scope();
$scope->spawn(function() { // <- new scope
echo "Sibling task 1\n";
spawn(function() { // <- $scope is current scope
echo "Sibling task 2\n";
spawn(function() { // <- $scope is current scope
echo "Sibling task 3\n";
});
});
});
$scope->awaitCompletion(Async\signal(SIGTERM));
**Expected output:**
Sibling task 1
Sibling task 2
Sibling task 3
**Structure:**
main() β defines a $scope
βββ $scope = new Scope()
βββ task1() β runs in the $scope
βββ task2() β runs in the $scope
βββ task3() β runs in the $scope
Thus, the function ''spawnWith()'' or method ''Scope::spawn'' creates a new branch of sibling coroutines,
where the new coroutine exists at the same level as all subsequent ones.
The code that owns a ''Scope'' object becomes the **Point of responsibility**
for all coroutines executed within that Scope.
> A good practice is to ensure that a Scope object has **only ONE owner**.
> Passing ''$scope'' as a parameter to other functions or assigning it to multiple objects
> is a **potentially dangerous** operation that can lead to complex bugs.
> If you need to interact with the ''Scope'' in different parts of the program,
> use ''Async\ScopeProvider'' containers or other appropriate mechanisms.
=== Scope waiting ===
> **Warning:** In general, it is strongly discouraged to wait on a ''Scope''; instead, prefer using a ''CoroutineGroup''.
The ''Scope'' class does not implement the ''Awaitable'' interface,
and therefore cannot be used in an ''await'' expression.
Awaiting a ''Scope'' is a potentially **dangerous operation** that should be performed consciously, not accidentally.
There are several Use-Cases where waiting for a ''Scope'' might be necessary:
* Structured concurrency: when a parent awaits the completion of all child coroutines.
* Waiting for Scope tasks to complete the cancellation process.
The structured concurrency pattern with waiting for all child coroutines can be useful
for applications whose lifetime is explicitly limited by external conditions.
For example, the user might stop a console application.
To support a task awaiting in a controlled manner, ''Scope'' provides two specific methods:
- ''public function awaitCompletion(Awaitable $cancellation): void {}''
- ''public function awaitAfterCancellation(?callable $errorHandler = null, ?Awaitable $cancellation = null): void {}''
The ''awaitCompletion'' method blocks the execution flow until all tasks within the scope are completed.
The ''awaitAfterCancellation'' method does the same but is intended to be called only after the scope has been cancelled.
use Async\Scope;
$scope = new Scope();
$scope->spawn(function() {
echo "Sibling task 1\n";
spawn(function() {
echo "Sibling task 2\n";
spawn(function() {
echo "Sibling task 3\n";
});
});
});
$scope->awaitCompletion(Async\signal(SIGTERM));
**Expected output:**
Sibling task 1
Sibling task 2
Sibling task 3
The ''Scope'' awaiting methods do not capture any task results,
so they cannot be used to await return values.
> βΉοΈ **Note:** If you need to retrieve the result of a group of tasks, use the ''Async\CoroutineGroup'' class.
The ''awaitCompletion'' method can only be used with an explicitly defined cancellation token.
This requirement helps prevent indefinite waiting.
Awaiting the ''$scope'' object also allows handling exceptions from coroutines within the ''$scope'':
use Async\Scope;
$scope = new Scope();
$scope->spawn(function() {
spawn(function() {
spawn(function() {
throw new Exception("Error occurred");
});
});
});
try {
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (Exception $exception) {
echo $exception->getMessage()."\n";
}
**Expected output:**
Error occurred
Calling the ''awaitCompletion()'' method after the ''Scope'' has been cancelled
will immediately throw a cancellation exception.
$scope = new Scope();
try {
$scope->spawn(task1(...));
$scope->spawn(task2(...));
$scope->cancel();
// Wait all tasks
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (Exception $exception) {
echo "Caught exception: ",$exception->getMessage()."\n";
}
**Expected output:**
Caught exception: cancelled at ...
In this example, ''$scope->awaitCompletion(Async\signal(SIGTERM));'' will immediately throw an exception.
If you need to wait for the ''Scope'' to complete after it has been cancelled,
use the special method ''awaitAfterCancellation'', which is designed for this case.
$scope = new Scope();
spawn(function() {
try {
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (\Async\CancellationException $exception) {
$scope->awaitAfterCancellation();
echo "Caught exception: ",$exception->getMessage()."\n";
}
});
$scope->spawn(function() use ($scope) {
$scope->cancel();
try {
Async\delay(1000);
} finally {
sleep(1);
echo "Finally\n";
}
});
**Expected output:**
Finally
Caught exception: cancelled at ...
In this example, the line ''Finally'' will be printed first because ''$scope->awaitAfterCancellation()''
waits for all coroutines inside the Scope to complete.
The ''awaitAfterCancellation'' method is used in scenarios where final resource cleanup is required
after all child tasks are guaranteed to have finished execution.
There is also a risk of indefinite waiting, so it is **recommended** to explicitly specify a timeout.
=== Scope Hierarchy ===
A hierarchy can be a convenient way to describe an application as a set of dependent tasks:
* Parent tasks are connected to child tasks and are responsible for their execution time.
* Tasks on the same hierarchy level are independent of each other.
* Parent tasks should control their child's tasks.
* Child tasks MUST NOT control or wait for their parent tasks.
* It is correct if tasks at the same hierarchy level are only connected to tasks of the immediate child level.
WebServer
βββ Request Worker
β βββ request1 task
β β βββ request1 subtask A
β β βββ request1 subtask B
β βββ request2 task
β βββ request2 subtask A
β βββ request2 subtask B
The work of a web server can be represented as a hierarchy of task groups that are interconnected.
The ''Request Worker'' is a task responsible for handling incoming requests. There can be multiple requests.
Each request may spawn subtasks. On the same level, all requests form a group of request-tasks.
''Scope'' is fit for implementing this concept:
WebServer
βββ Request Worker
β βββ request1 Scope
β β βββ request1 subtask A
β β β βββ subtask A Scope
β β β βββ sub-subtask A1
β β β βββ sub-subtask A2
β β βββ request1 subtask B
β βββ request2 Scope
β βββ request2 subtask A
β βββ request2 subtask B
β βββ subtask B Scope
β βββ sub-subtask B1
A new child ''Scope'' can be created using a special constructor:
''Scope::inherit()''.
It returns a new ''Scope'' object that acts as a child.
A coroutine created within the child ''Scope'' can also be considered
a child relative to the coroutines in the parent ''Scope''.
**An example:**
use Async\Scope;
use Async\CancellationException;
function connectionChecker($socket, callable $cancelToken): void
{
while (true) {
if(feof($socket)) {
$cancelToken("The connection was closed by user");
return;
}
Async\delay(1000); // throw CancellationException if canceled
}
}
function connectionLimiter(callable $cancelToken): void
{
Async\delay(10000);
$cancelToken("The request processing limit has been reached.");
}
function connectionHandler($socket): void
{
// Note that a parent Scope can stop the execution of all coroutines
// belonging to a child Scope at any moment.
$scope = Scope::inherit();
//
// Passing ''$scope'' via ''use'' into a single coroutine is equivalent to the logic:
// the lifetime of ''$scope'' equals the lifetime of the coroutine.
// In this way, we create a coroutine-closure that acts as a Point of Responsibility.
// This code is one example of how to implement Points of Responsibility.
//
$scope->spawn(function() use ($socket, $scope) {
$limiterScope = Scope::inherit(); // child scope for connectionLimiter and connectionChecker
// We do not provide direct access to the Scope object in other functions, because this is an antipattern!
$cancelToken = fn(string $message) => $scope->cancel(new CancellationException($message));
// Limiter coroutine
$limiterScope->spawn(connectionLimiter(...), $cancelToken);
// A separate coroutine checks that the socket is still active.
$limiterScope->spawn(connectionChecker(...), $socket, $cancelToken);
try {
sendResponse($socket, dispatchRequest(parseRequest($socket)));
} catch (\Exception $exception) {
fwrite($socket, "HTTP/1.1 500 Internal Server Error\r\n\r\n");
} finally {
fclose($socket);
// Explicitly cancel all coroutines in the child scope
$scope->cancel();
}
});
}
function socketServer(): void
{
// Main Server $scope
$scope = new Scope();
// Child coroutine that listens for a shutdown signal
//
// Note that we are passing ''$scope'' to another function!
// This is acceptable here because the code is within a single visual block,
// and the risk of an error due to oversight is minimal.
$scope->spawn(function() use ($scope) {
try {
// Note: The ''signal'' function is not part of this RFC,
// but it may be implemented in the standard library in the future.
// This example shows how such a function could be used.
// The ''signal'' function returns a trigger ''Awaitable''.
await Async\signal(SIGINT);
} finally {
$scope->cancel(new CancellationException('Server shutdown'));
}
}
try {
// The main coroutine that listens for incoming connections
// The server runs as long as this coroutine is running.
await($scope->spawn(function() {
while ($socket = stream_socket_accept($serverSocket, 0)) {
connectionHandler($socket);
});
}));
} catch (\Throwable $exception) {
echo "Server error: ", $exception->getMessage(), "\n";
} finally {
echo "Server should be stopped...\n";
// Graceful exit
try {
$scope->cancel();
// Await for all coroutines to finish but not more than 5 seconds
$scope->awaitAfterCancellation(cancellation: Async\timeout(5000));
echo "Server stopped\n";
} catch (\Throwable $exception) {
// Force exit
echo "Server error: ", $exception->getMessage(), "\n";
throw $exception;
}
}
}
Let's examine how this example works.
1. ''socketServer'' creates a new ''Scope'' for coroutines that will handle all connections.
2. Each new connection is processed using ''connectionHandler()'' in a separate ''Scope'',
which is inherited from the main one.
3. ''connectionHandler'' creates a new ''Scope'' for the ''connectionLimiter'' and ''connectionChecker'' coroutines.
4. ''connectionHandler'' creates coroutine: ''connectionLimiter()'' to limit the processing time of the request.
5. ''connectionHandler'' creates coroutine, ''connectionChecker()'', to monitor the connection's activity.
As soon as the client disconnects, ''connectionChecker'' will cancel all coroutines related to the request.
6. If the main ''Scope'' is closed, all coroutines handling requests will also be canceled.
GLOBAL <- globalScope
β
βββ socketListen (Scope) <- rootScope
β β
β βββ connectionHandler (Scope) <- request scope1
β β βββ connectionLimiter (Coroutine) <- $limiterScope
β β βββ connectionChecker (Coroutine) <- $limiterScope
β β
β βββ connectionHandler (Scope) <- request scope2
β β βββ connectionLimiter (Coroutine) <- $limiterScope
β β βββ connectionChecker (Coroutine) <- $limiterScope
β β
The ''connectionHandler'' doesn't worry if the lifetimes of the ''connectionLimiter'' or ''connectionChecker''
coroutines exceed the lifetime of the main coroutine handling the request,
because it is guaranteed to call ''$scope->cancel()'' when the main coroutine finishes.
''$limiterScope'' is used to explicitly define a child-group of coroutines
that should be cancelled when the request is completed. This approach minimizes errors.
On the other hand, if the server receives a shutdown signal,
all child ''Scopes'' will be cancelled because the main ''Scope'' will be cancelled as well.
Note that the coroutine waiting on ''await Async\signal(SIGINT)'' will not remain hanging in memory
if the server shuts down in another way, because ''$scope'' will be explicitly closed in the ''finally'' block.
=== Scope cancellation ===
The ''cancel'' method cancels all child coroutines and all child ''Scopes'' of the current ''Scope''.:
use function Async\Scope\delay;
$scope = new Scope();
$scope->spawn(function() {
spawn(function() {
Async\delay(1000);
echo "Task 1\n";
});
spawn(function() {
Async\delay(2000);
echo "Task 2\n";
});
});
$scope->cancel();
**Expected output:**
=== Scope disposal ===
**Coroutine Scope** has several resource cleanup strategies
that can be triggered either explicitly, on demand,
or implicitly when the ''Scope'' object loses its last reference.
There are three available strategies for ''Scope'' termination:
| **Method** | |
| ''disposeSafely'' | Marks as zombie coroutines, does not cancel |
| ''dispose'' | Cancels with a warning |
| ''disposeAfterTimeout'' | Issues a warning, then cancels after a delay |
The main goal of all three methods is to terminate the execution of coroutines
that belong to the ''Scope'' or its child Scopes.
However, each method approaches this task slightly differently.
The ''disposeSafely'' method is used by default in the destructor of the ''Async\Scope'' class.
Its key feature is transitioning coroutines into a **zombie coroutine** state.
A **zombie coroutine** continues execution but is tracked by the system differently than regular coroutines.
(See section: [Zombie coroutine policy](#zombie-coroutine-policy)).
A warning is issued when a **zombie coroutine** is detected.
use function Async\Scope\delay;
$scope = new Scope();
await($scope->spawn(function() {
spawn(function() {
Async\delay(1000);
echo "Task 1\n";
});
spawn(function() {
Async\delay(2000);
echo "Task 2\n";
});
echo "Root task\n";
}));
$scope->disposeSafely();
**Expected output:**
Root task
Warning: Coroutine is zombie at ... in Scope disposed at ...
Warning: Coroutine is zombie at ... in Scope disposed at ...
Task 1
Task 2
The ''$scope'' variable is released immediately after the coroutine ''Root task'' completes execution,
so the child coroutine ''Task 1'' does not have time to execute
before the ''disposeSafely'' method is called.
''disposeSafely'' detects this and signals it with a warning but allows the coroutine to complete.
The ''Scope::dispose'' method differs from ''Scope::disposeSafely'' in that it does not leave **zombie coroutines**.
It cancels **all coroutines**.
When coroutines are detected as unfinished, a warning is issued.
**Example:**
use function Async\Scope\delay;
$scope = new Scope();
await($scope->spawn(function() {
spawn(function() {
Async\delay(1000);
echo "Task 1\n";
});
spawn(function() {
Async\delay(2000);
echo "Task 2\n";
});
echo "Root task\n";
}));
$scope->dispose();
**Expected output:**
Warning: Coroutine is zombie at ... in Scope disposed at ...
Warning: Coroutine is zombie at ... in Scope disposed at ...
Warning: Coroutine is zombie at ... in Scope disposed at ...
The ''disposeAfterTimeout'' method is a delayed version of the ''disposeSafely'' method.
The ''$timeout'' parameter must be greater than zero but less than 10 minutes.
use Async\Scope;
class Service
{
private Scope $scope;
public function __construct() {
$this->scope = new Scope();
}
public function __destruct() {
$this->scope->disposeAfterTimeout(5000);
}
public function run(): void {
$this->scope->spawn(function() {
spawn(function() {
Async\delay(1000);
echo "Task 2\n";
Async\delay(5000);
echo "Task 2 next line never executed\n";
});
echo "Task 1\n";
});
}
}
$service = new Service();
$service->run();
sleep(1);
unset($service);
**Expected output:**
Task 1
Warning: Coroutine is zombie at ... in Scope disposed at ...
Task 2
The ''dispose*()'' methods can be called multiple times, which is not considered an error.
If the ''Scope::cancel()'' method is called with a parameter after the ''Scope'' has already been cancelled,
**PHP** will emit a warning indicating that the call will be ignored.
=== Scope cancellation/disposal order ===
If a ''Scope'' has child ''Scopes'', the coroutines in the child ''Scopes'' will be canceled or disposed first,
followed by those in the parent β from the bottom up in the hierarchy.
This approach increases the likelihood that resources will be released correctly.
However, it does not guarantee this,
since the exact order of coroutines in the execution queue cannot be determined with 100% certainty.
During the release of child ''Scopes'',
the same cleanup strategy is used that was applied to the parent ''Scope''.
If the ''disposeSafely'' method is called, the child Scopes will also be released using the ''disposeSafely'' strategy.
If the ''dispose'' method is used, the child Scopes will use the same method for cleanup.
The ''disposeAfterTimeout'' method will delay the execution of ''disposeSafely'' for the specified time.
=== Spawn with disposed scope ===
When the ''cancel()'' or ''dispose()'' method is called, the ''Scope'' is marked as closed.
Attempting to launch a coroutine with this Scope will result in a fatal exception.
$scope = new Scope();
$scope->spawn(function() {
echo "Task 1\n";
});
$scope->cancel();
$scope->spawn(function() { // <- AsyncException: Coroutine scope is closed
echo "Task 2\n";
});
==== Error detection ====
Detecting erroneous situations when using coroutines is an important part of analyzing an application's reliability.
The following scenarios are considered potentially erroneous:
1. A coroutine belongs to a global scope and is not awaited by anyone (a **zombie coroutine**).
2. The root scope has been destroyed (its destructor was called), but no one awaited
it or ensured that its resources were explicitly cleaned up (e.g., by calling ''$scope->cancel()'' or ''$scope->dispose()'').
3. Tasks were not cancelled using the ''cancel()'' method, but through a call to ''dispose()''.
This indicates that the programmer did not intend to cancel the execution of the coroutine,
yet it happened because the scope was destroyed.
4. An attempt to await a coroutine from within itself.
5. Awaiting ''$scope'' from within itself or from one of its child scopes.
6. Stuck tasks in the cancellation state.
7. Using ''CoroutineGroup'' with a result capturing without an ''await'' expression.
8. Deadlocks caused by circular dependencies between coroutines.
**PHP** will respond to such situations by issuing **warnings**, including debug information about the involved coroutines.
Developers are expected to write code in a way that avoids triggering these warnings.
An attempt to use the expression ''await($coroutine)'' from within the same coroutine throws an exception.
$coroutine = null;
$coroutine = spawn(function() use (&$coroutine) {
await($coroutine); // <- AsyncException: A coroutine cannot await itself. Coroutine spawned at ...
});
Using the ''Scope::awaitCompletion()'' from a coroutine that belongs to the same ''$scope'' or
to one of its child scopes will throw a fatal exception.
This condition makes it impossible to perform the ''$globalScope->awaitCompletion'' call.
$scope = new Scope();
$scope->spawn(function() use ($scope) {
$scope->awaitCompletion(Async\timeout(1000)); // <- AsyncException: Awaiting a scope from within itself or
// its child scope would cause a deadlock. Scope created at ...
});
=== Error mitigation strategies ===
The only way to create **zombie coroutines** is by using the ''spawn'' expression in the ''globalScope''.
However, if the initial code explicitly creates a scope and treats it as the application's entry point,
the initializing code gains full control β because ''spawn '' will no longer
be able to create a coroutine in ''globalScope'', thus preventing the application from hanging beyond the entry point.
Thereβs still a way to use global variables and ''new Scope'' to launch a coroutine that runs unchecked:
$GLOBALS['my'] = new Scope();
$GLOBALS['my']->spawn(function() { ... });
But such code can't be considered an accidental mistake.
To avoid accidentally hanging coroutines whose lifetimes were not correctly limited, follow these rules:
* Use **separate Scopes** for different coroutines. This is the best practice,
as it allows explicitly defining lifetime dependencies between Scopes.
* Use ''Scope::dispose()''. The ''dispose()'' method cancels coroutine execution and logs an error.
* Donβt mix semantically different coroutines within the same ''Scope''.
* Avoid building hierarchies between ''Scopes'' with complex interdependencies.
* Do not use cyclic dependencies between ''Scopes''.
* The principle of single point of responsibility and ''Scope'' ownership.
Do not pass the ''Scope'' object to different coroutine functions (unless the action happens in a closure).
Do not store ''Scope'' objects in different places.
Violating this rule can lead to manipulations with ''Scope'',
which may cause a deadlock or disrupt the application's logic.
* Child coroutines should not wait for their parents.
Child Scopes should not wait for their parents.
namespace ProcessPool;
use Async\Scope;
use Async\CoroutineGroup;
final class ProcessPool
{
private Scope $watcherScope;
private Scope $jobsScope;
private CoroutineGroup $pool;
/**
* List of pipes for each process.
* @var array
*/
private array $pipes = [];
/**
* Map of process descriptors: pid => bool
* If the value is true, the process is free.
* @var array
*/
private array $descriptors = [];
public function __construct(readonly public string $entryPoint, readonly public int $max, readonly public int $min)
{
// Define the coroutine scopes for the pool, watcher, and jobs
$this->watcherScope = new Scope();
$this->jobsScope = new Scope();
$this->pool = new CoroutineGroup(captureResults: false);
}
public function __destruct()
{
$this->watcherScope->dispose();
$this->pool->dispose();
$this->jobsScope->dispose();
}
public function start(): void
{
$this->watcherScope->spawn($this->processWatcher(...));
for ($i = 0; $i < $this->min; $i++) {
$coroutineGroup->add($this->poolScope->spawn($this->startProcess(...)));
}
}
public function stop(): void
{
$this->watcherScope->cancel();
$this->pool->cancel();
$this->jobsScope->cancel();
}
private function processWatcher(): void
{
while (true) {
try {
await($this->pool);
} catch (StopProcessException $exception) {
echo "Process was stopped with message: {$exception->getMessage()}\n";
if($exception->getCode() !== 0 || count($this->descriptors) < $this->min) {
$this->pool->spawn($this->startProcess(...));
}
}
}
}
}
The example above demonstrates how splitting coroutines into
Scopes helps manage their interaction and reduces the likelihood of errors.
Here, ''watcherScope'' monitors tasks in ''poolScope''.
When a process finishes, the watcher detects this event and, if necessary, starts a new process or not.
The monitoring logic is completely separated from the process startup logic.
The lifetime of ''watcherScope'' matches that of ''poolScope'', but not longer than the lifetime of the watcher itself.
The overall lifetime of all coroutines in the ''ProcessPool'' is determined by the lifetime of the ''ProcessPool''
object or by the moment the ''stop()'' method is explicitly called.
==== Zombie coroutine policy ====
Coroutines whose lifetime extends beyond the boundaries of their parent ''Scope''
are handled according to a separate **policy**.
This policy aims to strike a balance between uncontrolled resource leaks and the need to abruptly
terminate coroutines, which could lead to data integrity violations.
If there are no active coroutines left in the execution queue and no events to wait for,
the application is considered complete.
**Zombie coroutines** differ from regular ones in that they are not counted as active.
Once the application is considered finished,
zombie coroutines are given a time limit within which they must complete execution.
If this limit is exceeded, all zombie coroutines are canceled.
The delay time for handling zombie coroutines can be configured using
a constant in the ''php.ini'' file: ''async.zombie_coroutine_timeout'', which is set to two seconds by default.
If a coroutine is created within a user-defined ''Scope'', the programmer
can set a custom timeout for that specific ''Scope'' using the ''Scope::disposeAfterTimeout(int $ms)'' method.
==== CoroutineGroup ====
> ''Async\CoroutineGroup'' is a container for controlling a group of coroutines.
function mergeFiles(string ...$files): string
{
$coroutineGroup = new Async\CoroutineGroup(captureResults: true);
foreach ($files as $file) {
$coroutineGroup->spawn(file_get_contents(...), $file);
}
return implode("\n", await($coroutineGroup));
}
echo await(spawn(mergeFiles(...), ['file1.txt', 'file2.txt', 'file3.txt']));
=== Motivation ===
''CoroutineGroup'' is an explicit and safe way to manage a group of tasks.
Using the ''Scope'' class and the ''spawn'' expression, you can create groups of coroutines.
However, the code that creates ''$scope'' and/or awaits it might not be aware of which coroutines will be added.
Moreover, the wait strategy of ''$scope'' can lead to resource leaks
if a programmer mistakenly uses the ''spawn '' expression
and adds a coroutine to the ''Scope'' that lives indefinitely.
The ''CoroutineGroup'' class is an explicit pattern for managing a group of coroutines.
Unlike ''Scope'', tasks cannot be added to it accidentally.
In a ''CoroutineGroup'', a task can only be added **explicitly**, using the ''spawn with'' expression.
A ''CoroutineGroup'' is not propagated through the execution context by child coroutines.
And unlike ''Scope'', ''CoroutineGroup'' can capture the results of tasks, which makes it convenient for awaiting results.
If ''Scope'' is used to create a shared space for coroutines,
then ''CoroutineGroup'' is intended for explicit control over child tasks.
In this role, ''CoroutineGroup'' serves as a complement to the logic of ''Scope''.
=== CoroutineGroup usage ===
The ''CoroutineGroup'' constructor accepts several parameters:
1. ''$scope'' β the ''Scope'' in which the tasks will be executed.
If this parameter is not provided, a new, separate ''Scope'' will be created.
2. ''$captureResults'' β an option to capture the results of the tasks.
3. ''$bounded'' β an option to ''Scope::dispose'' all Scope tasks when the ''CoroutineGroup'' is disposed.
Once a ''$coroutineGroup'' is created, it can be used in a ''spawn with $coroutineGroup'' expression,
which has an additional effect for task groups: a coroutine is created within the ''CoroutineGroup'''s ''$scope'',
and the coroutine is added to the task group.
A ''CoroutineGroup'' holds a reference to the ''Scope'' in which the tasks will be executed.
If this is the only reference to the ''Scope'', the ''CoroutineGroup'' will automatically call ''Scope::dispose()''
as soon as the ''CoroutineGroup::dispose'' or ''CoroutineGroup::cancel'' method is invoked.
The expression ''spawn with $coroutineGroup'' creates a coroutine in the ''$scope'' specified in the ''CoroutineGroup'',
and additionally adds the task to the group. If child coroutines create other coroutines using the ''spawn'' expression,
they will be added to the ''CoroutineGroup'''s scope, but not to the task group itself.
use Async\Scope;
use Async\CoroutineGroup;
function task1() {
spawn('subtask');
}
$scope = new Scope();
$coroutineGroup1 = new CoroutineGroup($scope);
$coroutineGroup2 = new CoroutineGroup($scope);
$coroutineGroup1->spawn(task1(...));
$coroutineGroup2->spawn(task2(...));
$scope->awaitCompletion(Async\signal(SIGTERM));
**Structure:**
main() β defines a $scope
βββ $scope = new Scope()
βββ task1() β runs in the $scope
βββ subtask() β runs in the $scope
βββ task2() β runs in the $scope
βββ $coroutineGroup1 = new CoroutineGroup($scope)
βββ task1() β runs in the $scope
βββ $coroutineGroup2 = new CoroutineGroup($scope)
βββ task2() β runs in the $scope
The tasks ''task1()'' and ''task2()'' belong to different groups but are in the same ''Scope''.
The coroutine ''subtask()'', which was launched from ''task1()'', does not belong to any group.
If ''$scope'' is disposed, all task groups will be cancelled.
However, cancelling a task group will not cancel tasks in the ''Scope''
if the reference count to ''$scope'' is greater than one.
=== Await CoroutineGroup ===
The ''CoroutineGroup'' class implements the ''Awaitable'' interface,
so it can be used with the ''await'' expression.
The ''await($coroutineGroup)'' expression captures both the results of execution
and any exceptions that occur in the tasks.
If the constructor option ''captureResults: true'' is specified,
then the ''await($coroutineGroup)'' expression will return the results of all tasks that were added to the group.
If the results are no longer needed, the ''CoroutineGroup::disposeResults()'' method should be used to discard them.
function processInBatches(array $files, int $limit): array
{
$allResults = [];
$coroutineGroup = new Async\CoroutineGroup(captureResults: true);
$count = 0;
foreach ($files as $file) {
$coroutineGroup->spawn(file_get_contents(...), $file);
if (++$count >= $limit) {
$allResults = [...$allResults, ...await($coroutineGroup)];
$coroutineGroup->disposeResults();
$count = 0;
}
}
$allResults = [...$allResults, ...await($coroutineGroup)];
return $allResults;
}
$results = await(spawn(processInBatches(...), ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt'], limit: 2));
echo implode("\n", $results);
The ''$coroutineGroup'' object can be used in an ''await'' expression multiple times.
If the ''captureResults'' mode is not enabled, the ''await'' expression will always return ''NULL''.
> Be careful when capturing coroutine results, as this may cause memory leaks or keep large amounts of data in memory.
> Plan the waiting process wisely, and use the ''CoroutineGroup::disposeResults()'' method.
=== CoroutineGroup ''dispose'' ===
When a ''CoroutineGroup'' is disposed,
all tasks belonging to it will be cancelled using ''cancel'',
without issuing any warnings. No tasks will turn into **zombie coroutines**.
This behavior is consistent with ''Scope::cancel()''.
The reason for this behavior lies in the fact that ''CoroutineGroup'' only keeps track of explicitly added tasks.
If a task group is being disposed, it means the user clearly understands
that all coroutines launched within it should also be terminated.
=== CoroutineGroup and Scope ===
''CoroutineGroup'' is designed to complement the behavior of ''Scope'' where needed.
Although a single ''Scope'' can have multiple ''CoroutineGroup''s, in most cases
it is reasonable to create a ''CoroutineGroup'' along with a unique ''Scope'' that belongs only to it.
This leads to clear and memorable behavior: **the lifetime of a ''CoroutineGroup'' equals the lifetime of its ''Scope''**.
If this rule is followed, then an exception in a coroutine will lead to the cancellation of the ''Scope'',
which is required before the cancellation operation, and will trigger ''CoroutineGroup::dispose''.
On the other hand, releasing a ''CoroutineGroup'' object automatically leads to the disposal of its ''Scope''.
This architecture helps reduce the likelihood of resource leakage errors.
The ''CoroutineGroup'' class allows for implementing a pattern in which tasks are divided into two groups:
* explicit (or target) tasks, whose results are needed
* implicit (or secondary) tasks, which are created within explicit tasks and whose results are not needed.
**Explicit tasks** belong directly to the ''CoroutineGroup'' and are created using the ''spawn with $coroutineGroup'' expression.
All other tasks are considered as **secondary**.
This separation helps produce code that manages resources more efficiently
than code that waits for the completion of all child coroutines within a ''Scope''.
The following code demonstrates this idea:
use Async\Scope;
use Async\CoroutineGroup;
function targetTask(int $i): void
{
spawn(function() {
// subtask should be added to the same scope
});
}
$coroutineGroup = new CoroutineGroup(scope: new Scope(), captureResults: true);
for($i = 0; $i < 10; $i++) {
$coroutineGroup->spawn(targetTask(...), $i);
}
// wait for only the tasks that were added to the CoroutineGroup
$results = await($coroutineGroup);
The expression ''await($coroutineGroup)'' will wait only for the completion of the target tasks
that were explicitly added to the ''CoroutineGroup''.
The result of ''await($coroutineGroup)'' will include the outcomes of all coroutines from the ''CoroutineGroup'',
but not of other **implicit** coroutines that were created during the execution of ''targetTask()''.
Once ''$coroutineGroup'' is destroyed, the ''Scope'' it references will also be disposed of,
which means all other **implicit** coroutines will be cancelled using one of
the three strategies (see the corresponding section).
The reverse is also true: if the ''Scope'' is disposed, the associated ''CoroutineGroup''s will be disposed as well.
$scope = new Async\Scope();
$coroutineGroup = new Async\CoroutineGroup(scope: $scope, captureResults: false);
$coroutineGroup->spawn(function() {
// this task will be added to the task group
Async\delay(1000);
echo "This line will be executed\n";
});
sleep(1);
$scope->dispose();
**Expected output:**
There are no warnings about **zombie coroutines** in the output
because the task was canceled using ''$coroutineGroup->dispose()''.
However, if the ''Scope'' contains other coroutines that were created outside the ''CoroutineGroup'',
they will follow the general rules. In the case of the ''Scope::disposeSafely()'' strategy,
a warning will be issued if unfinished tasks are detected, as they would become **zombie coroutines**.
=== CoroutineGroup Race ===
The ''CoroutineGroup'' class allows you to wait for the first task to complete using the ''race()'' method.
use Async\CoroutineGroup;
function fetchFirstSuccessful(string ...$apiHosts): string
{
$coroutineGroup = new Async\CoroutineGroup(captureResults: false);
foreach ($apiHosts as $host) {
$coroutineGroup->spawn(function() use ($host) {
$response = file_get_contents($host);
if($response === false) {
throw new Exception("Failed to fetch data from $host");
}
return $response;
});
}
return await($coroutineGroup->race(ignoreErrors: true));
}
The ''CoroutineGroup::race()'' method returns an ''Awaitable'' trigger
that can be used multiple times to obtain the first completed task.
The ''race()'' trigger clears the internal result storage after completion,
so you won't be able to retrieve the same result twice.
If you need to get the first available result, use the ''firstResult()'' method.
The ''CoroutineGroup::firstResult()'' trigger returns the first available result.
Even if it is called repeatedly, the result
will remain the same until the ''CoroutineGroup::disposeResults()'' method cancels the previous values.
The ''ignoreErrors'' parameter specifies the error ignoring mode.
If it is set to ''true'', exceptions from tasks will be ignored, and the ''race()''/''firstResult()''
triggers will return the first successful task.
The ''CoroutineGroup::getErrors()'' method will return an array of exceptions.
=== CoroutineGroup hierarchy ===
You can combine ''CoroutineGroup'' with ''Scope::inherit()'' to create a task group within a child ''Scope'',
thereby forming a hierarchy between groups:
use Async\CoroutineGroup;
use Async\Scope;
$coroutineGroupParent = new CoroutineGroup(captureResults: false);
$coroutineGroupParent->spawn(function() {
$coroutineGroupChild = new CoroutineGroup(Scope::inherit(), captureResults: false);
$coroutineGroupChild->spawn(function() {
// this task will be added to the child task group
});
// wait for the child task group to finish
await($coroutineGroupChild);
};
await($coroutineGroupParent);
**Structure:**
main()
βββ $coroutineGroupParent = new CoroutineGroup() <- parent task group scope
βββ $coroutineGroupChild = new CoroutineGroup(Scope::inherit()) <- child task group scope
Since each ''CoroutineGroup'' is associated with its own ''Scope'',
and ''Scope'' instances are connected through parent-child relationships,
cancelling a parent ''CoroutineGroup'' will automatically cancel the entire hierarchy.
=== CoroutineGroup cancellation ===
The ''CoroutineGroup'' class allows you to cancel all tasks in the group using the ''CoroutineGroup::cancel()'' method.
This method behaves the same way as ''CoroutineGroup::dispose'',
with the only difference being that it allows you to pass a specific exception.
use Async\CoroutineGroup;
$coroutineGroup = new Async\CoroutineGroup(captureResults: false);
$coroutineGroup->spawn(function() {
try {
suspend();
} catch (Throwable $throwable) {
echo "Task was cancelled: ", $throwable->getMessage(), "\n";
}
});
// pass control to the task
suspend();
$coroutineGroup->cancel(new \Async\CancellationException('Custom cancellation message'));
**Expected output:**
Task was cancelled: Custom cancellation message
=== CoroutineGroup error handling ===
''CoroutineGroup'' does not introduce additional logic for handling coroutine exceptions.
When a developer uses the expression ''await($coroutineGroup)'', they are capturing the exceptions
of all tasks contained within ''$coroutineGroup''.
In other words, ''await($coroutineGroup)'' is equivalent to simultaneously using ''await($coroutine)'' for each task.
If no one awaits ''$coroutineGroup'', the exception handling follows the general ''Flow'',
and the error will propagate to the ''Scope''.
An additional method ''CoroutineGroup::all(bool $ignoreErrors = false, $nullOnFail = false): Awaitable {}'' provides a trigger
that fires when all tasks in the group have completed.
At the same time, it captures any errors, which can be retrieved using ''CoroutineGroup::getErrors()''.
// Returns an array of all tasks with their results ignoring errors
return $coroutineGroup->all(ignoreErrors: true);
The trigger ''CoroutineGroup::all()'' returns an array of results with numeric indices,
where each index corresponds to the ordinal number of the task.
If a task completed with an error, its numeric index is missing from the array.
Using the option ''$nullOnFail'', you can specify that the results of failed
tasks should be filled with ''NULL'' instead.
$coroutineGroup = new Async\CoroutineGroup(captureResults: true);
$coroutineGroup->spawn(function() {return 'result 1';});
$coroutineGroup->spawn(function() {throw new Exception('Error');});
var_dump(await($coroutineGroup->all(ignoreErrors: true, nullOnFail: true)));
**Expected output:**
array(2) {
[0]=>
string(8) "result 1"
[1]=>
NULL
}
The method ''CoroutineGroup::getErrors()'' returns an array with numeric indices and exceptions,
where each index corresponds to the ordinal number of the task.
> **Note:** The method ''CoroutineGroup::disposeResults'' clears all results and errors at the moment it is called.
> Coroutines then reset their ordinal indices starting from zero.
=== CoroutineGroup scope exception handling ===
What happens when a coroutine that belongs to a ''Scope''
but does not belong to a ''CoroutineGroup'' throws an exception?
1. If the exception is not handled, it will propagate to the ''Scope''.
2. If a ''Scope'' has no exception handler, it invokes the ''dispose()'' strategy,
which cancels all coroutines, including any ''CoroutineGroup'' associated with the ''Scope''.
In this case, the ''await'' point of the ''CoroutineGroup'' will receive a ''CancellationException''.
**Example:**
use Async\CoroutineGroup;
$coroutineGroup = new Async\CoroutineGroup(captureResults: false);
$coroutineGroup->spawn(function() {
spawn(function() { // <- subcoroutine in the same scope
throw new Exception('Error in coroutine');
});
sleep(1);
};
try {
await($coroutineGroup);
} catch (Async\CancellationException $exception) {
echo "Caught exception: ", $exception->getMessage(), "\n";
}
**Expected output:**
Caught exception: CoroutineGroup was cancelled at ...
If you need to handle this type of exception, use the ''Scope::setExceptionHandler''
method before calling ''await($coroutineGroup)'':
use Async\CoroutineGroup;
$scope = new Async\Scope();
$coroutineGroup = new Async\CoroutineGroup(scope: $scope, captureResults: false);
$scope->setExceptionHandler(function (Async\Scope $scope, Async\Coroutine $coroutine, Throwable $e) {
echo "Caught exception: {$e->getMessage()}\n in coroutine: {$coroutine->getSpawnLocation()}\n";
});
$coroutineGroup->spawn(function() {
spawn(function() { // <- subcoroutine in the same scope
throw new Exception('Error in coroutine');
});
sleep(1);
});
await($coroutineGroup);
Please see [Error Handling](#error-handling) for more details.
=== CoroutineGroup vs Scope ===
| Feature | CoroutineGroup | Scope |
| **Purpose** | Manages a group of explicitly added tasks | Manages lifetime and hierarchy of all child tasks |
| **Task Addition** | Only via ''spawn with $coroutineGroup'' | Any coroutine in current scope is added implicitly |
| **Result Capturing** | Can capture task results (optional) | Does not capture results |
| **Implements Awaitable** | Yes, can be used with ''await'' | No, must use ''awaitCompletion()'' |
| **Used for Structured Concurrency** | Yes, in grouped execution | Yes, in hierarchy and parent-child relationships |
| **Cancelling Behavior** | Cancels only its own tasks | Cancels all tasks in the scope and children Scope |
| **Automatic Disposal** | Disposes its scope if owns it | ''disposeSafly'', ''dispose'', ''cancel'' |
| **Usage Recommendation** | Prefer for result-driven parallel logic | Prefer for lifecycle and hierarchical control |
=== Structured concurrency ===
**Structured concurrency** allows organizing coroutines into a group or hierarchy
to manage their lifetime or exception handling.
The parent task is required to take responsibility for its child tasks and
must not complete before the children have finished their execution.
To implement structured concurrency, it is recommended to use the ''CoroutineGroup'' class.
The following code implements this idea:
use Async\Scope;
function copyFile(string $sourceFile, string $targetFile): void
{
$source = fopen($sourceFile, 'r');
$target = fopen($targetFile, 'w');
$buffer = null;
try {
// Child scope
$tasks = new \Async\CoroutineGroup(Scope::inherit());
// Read data from the source file
$tasks->spawn(function() use (&$buffer, $source) {
while (!feof($source)) {
if ($buffer === null) {
$chunk = fread($source, 1024);
$buffer = $chunk !== false && $chunk !== '' ? $chunk : null;
}
suspend();
}
$buffer = '';
});
// Write data to the target file
$tasks->spawn(function() use (&$buffer, $target) {
while (true) {
if (is_string($buffer)) {
if ($buffer === '') {
break; // End of a file
}
fwrite($target, $buffer);
$buffer = null;
}
suspend();
}
echo "Copy complete.\n";
});
await($tasks);
} finally {
fclose($source);
fclose($target);
}
}
$copyTasks = new \Async\CoroutineGroup;
$copyTasks->spawn(copyFile(...), 'source.txt', 'target.txt');
$copyTasks->spawn(copyFile(...), 'source2.txt', 'target2.txt');
await($copyTasks);
The example creates two task groups: a parent and a child.
The parent task group handles the copy operations directly, while the child tasks perform file reading and writing.
File descriptors will not be closed until all child copy tasks have completed.
The main code will not finish until all copy operations are completed.
==== Async combinators ====
Async combinators are functions that combine multiple awaitable objects into complex execution patterns.
The API provides both error-propagating and error-collecting variants for different use cases.
=== Basic Combinators ===
| Function | Description |
| ''await(Awaitable $awaitable, ?Awaitable $cancellation = null)'' | Waits for a single awaitable to complete |
| ''awaitAny(iterable $triggers, ?Awaitable $cancellation = null)'' | Returns result of the first completed awaitable |
| ''awaitAll(iterable $triggers, ?Awaitable $cancellation = null)'' | Waits for all awaitables to complete, returns array of results |
=== Error-Collecting Combinators ===
| Function | Description |
| ''awaitFirstSuccess(iterable $triggers, ?Awaitable $cancellation = null)'' | Returns first successful result, collects errors from failures |
| ''awaitAllWithErrors(iterable $triggers, ?Awaitable $cancellation = null)''| Waits for all, returns both results and errors separately |
=== Counted Combinators ===
| Function | Description |
| ''awaitAnyOf(int $count, iterable $triggers, ?Awaitable $cancellation = null)'' | Waits for specified number of awaitables to complete |
| ''awaitAnyOfWithErrors(int $count, iterable $triggers, ?Awaitable $cancellation = null)''| Same as awaitAnyOf but collects errors |
=== Error Propagation vs Error Collection ===
**Error-Propagating Functions** (''awaitAny'', ''awaitAll'', ''awaitAnyOf''):
* Stop execution on first exception
* Throw the exception immediately
* Cancel remaining coroutines
**Error-Collecting Functions** (''awaitFirstSuccess'', ''awaitAllWithErrors'', ''awaitAnyOfWithErrors''):
* Continue execution despite exceptions
* Return results and errors separately as ''[results, errors]''
* Allow partial success scenarios
=== awaitAll() - Parallel Execution ===
$results = awaitAll([
spawn('fetchUserData'),
spawn('fetchUserSettings'),
spawn('fetchUserPreferences')
]);
// $results = [userData, userSettings, userPreferences]
// Order matches input order
=== awaitAny() - Race Condition ===
// Returns result from fastest source
$data = awaitAny([
spawn('fetchFromCache'),
spawn('fetchFromDatabase'),
spawn('fetchFromAPI')
]);
=== awaitFirstSuccess() - Fault Tolerance ===
[$result, $errors] = awaitFirstSuccess([
spawn('callUnreliableAPI1'),
spawn('callUnreliableAPI2'),
spawn('callUnreliableAPI3')
]);
if ($result !== null) {
echo "Got result: $result\n";
} else {
echo "All calls failed. Errors: " . count($errors) . "\n";
}
=== awaitAllWithErrors() - Complete Data Collection ===
[$results, $errors] = awaitAllWithErrors([
spawn('processFile', 'file1.txt'),
spawn('processFile', 'file2.txt'),
spawn('processFile', 'corrupted.txt') // might fail
]);
// $results = ["result1", "result2", null]
// $errors = [2 => Exception("corrupted file")]
=== awaitAnyOf() - Partial Completion ===
// Wait for first 3 downloads to complete
$completedDownloads = awaitAnyOf(3, [
spawn('downloadFile', 'file1.zip'),
spawn('downloadFile', 'file2.zip'),
spawn('downloadFile', 'file3.zip'),
spawn('downloadFile', 'file4.zip'),
spawn('downloadFile', 'file5.zip')
]);
// Returns associative array: [1 => "file2.zip", 0 => "file1.zip", 4 => "file5.zip"]
// Keys preserve original array indices
=== Cancellation Support ===
All combinators support cancellation through the optional ''$cancellation'' parameter:
try {
$results = awaitAll($tasks, Async\timeout(30000)); // 30 second timeout
} catch (Async\AwaitCancelledException $e) {
echo "Operation timed out\n";
}
=== Advanced Patterns ===
**Retry with Multiple Strategies:**
function fetchWithRetry(string $url): string
{
for ($attempt = 1; $attempt <= 3; $attempt++) {
[$result, $errors] = awaitFirstSuccess([
spawn('fetchFromCDN', $url),
spawn('fetchFromOrigin', $url),
spawn('fetchFromBackup', $url)
]);
if ($result !== null) {
return $result;
}
echo "Attempt $attempt failed, retrying...\n";
Async\delay(1000 * $attempt); // exponential backoff
}
throw new Exception("All retry attempts failed");
}
**Batch Processing with Concurrency Control:**
function processBatch(array $items, int $batchSize = 10): array
{
$allResults = [];
for ($i = 0; $i < count($items); $i += $batchSize) {
$batch = array_slice($items, $i, $batchSize);
$tasks = array_map(fn($item) => spawn('processItem', $item), $batch);
$batchResults = awaitAll($tasks);
$allResults = array_merge($allResults, $batchResults);
}
return $allResults;
}
==== Timer functions ====
The standard async library includes two functions similar to ''usleep()'':
* ''Async\delay(int $ms): void''
* ''Async\timeout(int $ms): Awaitable''
The ''delay'' function suspends the execution of a coroutine for the specified number of milliseconds.
Unlike ''usleep'', the ''delay'' function will throw a cancellation exception if the coroutine is cancelled.
The ''timeout'' function is similar to ''delay'', but it returns an ''Awaitable'' object:
use function Async\timeout;
use function Async\delay;
try {
Async\delay(1000); // suspends the coroutine for 1 second
// Try to fetch data from the URL within 1 second
echo await(spawn('file_get_content', 'https://php.net/'), Async\timeout(1000));
} catch (\Async\AwaitCancelledException) {
echo "Operation was cancelled by timeout\n";
} catch (\Async\CancellationException) {
echo "Operation was cancelled by user\n";
}
==== Error Handling ====
An uncaught exception in a coroutine follows this flow:
1. If the coroutine is awaited using the ''await'' keyword,
the exception is propagated to the awaiting points.
If multiple points are awaiting, each will receive the same exception
(**Each await point will receive the exact same exception object, not cloned**).
2. The exception is passed to the ''Scope''.
3. If the ''Scope'' has an exception handler defined, it will be invoked.
4. If the ''Scope'' does not have an exception handler, the ''cancel()'' method is called,
canceling all coroutines in this scope, including all child scopes.
5. If the ''Scope'' has responsibility points, i.e., the construction ''Scope::awaitCompletion'',
all responsibility points receive the exception.
6. Otherwise, the exception is passed to the parent scope if it is defined.
7. If there is no parent scope, the exception falls into ''globalScope'',
where the same rules apply as for a regular scope.
{{ :rfc:true_async:exception_flow.svg |}}
**Example:**
use Async\Scope;
$scope = new Scope();
$scope->spawn(function() {
throw new Exception("Task 1");
});
$exception1 = null;
$exception2 = null;
$scope2 = new Scope();
$scope2->spawn(function() use ($scope, &$exception1) {
try {
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (Exception $e) {
$exception1 = $e;
echo "Caught exception1: {$e->getMessage()}\n";
}
});
$scope2->spawn(function() use ($scope, &$exception2) {
try {
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (Exception $e) {
$exception2 = $e;
echo "Caught exception2: {$e->getMessage()}\n";
}
});
$scope2->awaitCompletion(Async\signal(SIGTERM));
echo $exception1 === $exception2 ? "The same exception\n" : "Different exceptions\n";
If an exception reaches ''globalScope'' and is not handled in any way,
it triggers **Graceful Shutdown Mode**, which will terminate the entire application.
The ''Scope'' class allows defining an exception handler that can prevent exception propagation.
For this purpose, two methods are used:
- **''setExceptionHandler''** β triggers for any exceptions thrown within this **Scope**.
- **''setChildScopeExceptionHandler''** β triggers for exceptions from **child Scopes**.
> The methods ''setExceptionHandler'' and ''setChildScopeExceptionHandler'' cannot be used with the ''globalScope''.
> If an attempt is made to do so, an exception will be thrown.
**Example:**
$scope = new Scope();
$scope->setExceptionHandler(function (Async\Scope $scope, Async\Coroutine $coroutine, Throwable $e) {
echo "Caught exception: {$e->getMessage()}\n in coroutine: {$coroutine->getSpawnLocation()}\n";
});
$scope->spawn(function() {
throw new Exception("Task 1");
});
$scope->awaitCompletion(Async\signal(SIGTERM));
Using these handlers,
you can implement the **Supervisor** pattern, i.e.,
a **Scope** that will not be canceled when an exception occurs in coroutines.
> If the ''setExceptionHandler'' or ''setChildScopeExceptionHandler'' handlers throw an exception,
> it will be propagated to the **parent Scope** or the **global Scope**.
The **''setChildScopeExceptionHandler''** method allows handling exceptions only from **child Scopes**,
which can be useful for implementing an algorithm where the **main Scope** runs core tasks,
while **child Scopes** handle additional ones.
For example:
use Async\Scope;
use Async\Coroutine;
final class Service
{
private Scope $scope;
public function __construct()
{
$this->scope = new Scope();
$this->scope->setChildScopeExceptionHandler(
static function (Scope $scope, Coroutine $coroutine, \Throwable $exception): void {
echo "Occurred an exception: {$exception->getMessage()} in Coroutine {$coroutine->getSpawnLocation()}\n";
});
}
public function start(): void
{
$this->scope->spawn($this->run(...));
}
public function stop(): void
{
$this->scope->cancel();
}
private function run(): void
{
while (($socket = $this->service->receive()) !== null) {
$scope = Scope::inherit($this->scope);
// supervisor pattern
($scope->spawn($this->handleRequest(...), $socket)->onFinally(
static function () use ($scope) {
$scope->disposeSafely();
}
);
}
}
}
''$this->scope'' listens for new connections on the server socket.
Canceling ''$this->scope'' means shutting down the entire service.
Each new connection is handled in a separate **Scope**, which is inherited from ''$this->scope''.
If an exception occurs in a coroutine created within a **child Scope**,
it will be passed to the ''setChildScopeExceptionHandler'' handler and will not affect
the operation of the service as a whole.
{{ :rfc:true_async:supervisor.svg |}}
Please see also [CoroutineGroup scope exception handling](#CoroutineGroup-scope-exception-handling)
and [CoroutineGroup error handling](#CoroutineGroup-error-handling) sections.
=== Responsibility points ===
A **responsibility point** is code that explicitly waits for the completion of a coroutine or a ''Scope'':
$scope = new Scope();
$scope->spawn(function() {
throw new Exception("Task 1");
});
try {
$scope->awaitCompletion(Async\signal(SIGTERM));
} catch (\Throwable $e) {
echo "Caught exception: {$e->getMessage()}\n";
}
=== Exception Handling ===
The ''Scope'' class provides a method for handling exceptions:
$scope = new Scope();
$scope->spawn(function() {
throw new Exception("Task 1");
});
$scope->setExceptionHandler(function (Exception $e) {
echo "Caught exception: {$e->getMessage()}\n";
});
$scope->awaitCompletion(Async\signal(SIGTERM));
An exception handler has the right to suppress the exception.
However, if the exception handler throws another exception,
the exception propagation algorithm will continue.
==== onFinally ====
The ''onFinally'' method allows defining a callback function that will be invoked when a coroutine or scope completes.
This method can be considered a direct analog of ''defer'' in Go.
> β οΈ **Important:** All ''onFinally'' handlers are executed concurrently in separate coroutines.
> This ensures that slow handlers do not block the completion of other handlers or the main execution flow.
=== Scope::onFinally ===
For ''Scope'', the callback receives the completed scope as a parameter:
$scope = new Scope();
$scope->spawn(function() {
throw new Exception("Task 1");
});
$scope->onFinally(function (Scope $completedScope) {
echo "Scope " . spl_object_id($completedScope) . " completed\n";
});
$scope->awaitCompletion(Async\signal(SIGTERM));
=== Coroutine::onFinally ===
For ''Coroutine'', the callback receives the completed coroutine as a parameter:
function task(): void
{
throw new Exception("Task 1");
}
$coroutine = spawn('task');
$coroutine->onFinally(function (Coroutine $completedCoroutine) {
echo "Coroutine " . spl_object_id($completedCoroutine) . " completed\n";
});
The ''onFinally'' semantics are most commonly used to release resources,
serving as a shorter alternative to ''try-finally'' blocks:
function task(): void
{
$file = fopen('file.txt', 'r');
onFinally(fn() => fclose($file));
throw new Exception("Task 1");
}
spawn('task');
==== Cancellation ====
The cancellation operation is available for coroutines and scopes
using the ''cancel()'' method:
function task(): void {}
$coroutine = spawn(task(...));
// cancel the coroutine
$coroutine->cancel(new Async\CancellationException('Task was cancelled'));
The cancellation operation is implemented as follows:
1. If a coroutine has not started, it will never start.
2. If a coroutine is suspended, its execution will resume with an exception.
3. If a coroutine has already completed, nothing happens.
The ''CancellationException'', if unhandled within a coroutine, is automatically suppressed after the coroutine completes.
> β οΈ **Warning:** You should not attempt to suppress ''CancellationException'' exception,
> as it may cause application malfunctions.
$scope = new Scope();
$scope->spawn(function() {
sleep(1);
echo "Task 1\n";
});
$scope->cancel(new Async\CancellationException('Task was cancelled'));
Canceling a ''Scope'' triggers the cancellation of all coroutines
within that ''Scope'' and all child ''Scopes'' in hierarchical order.
>
> **Note:** ''CancellationException'' can be extended by the user
> to add metadata that can be used for debugging purposes.
>
==== CancellationException handling ====
In the context of coroutines, it is not recommended to use ''catch \Throwable'' or ''catch CancellationException''.
Since ''CancellationException'' does not extend the ''\Exception'' class,
using ''catch \Exception'' is a safe way to handle exceptions,
and the ''finally'' block is the recommended way to execute finalizing code.
try {
$coroutine = spawn(function() {
sleep(1);
throw new \Exception("Task 1");
});
spawn(function() use ($coroutine) {
$coroutine->cancel();
});
try {
await($coroutine);
} catch (\Exception $exception) {
// recommended way to handle exceptions
echo "Caught exception: {$exception->getMessage()}\n";
}
} finally {
echo "The end\n";
}
Expected output:
The end
try {
$coroutine = spawn(function() {
sleep(1);
throw new \Exception("Task 1");
});
spawn(function() use ($coroutine) {
$coroutine->cancel();
});
try {
await($coroutine);
} catch (Async\CancellationException $exception) {
// not recommended way to handle exceptions
echo "Caught CancellationException\n";
throw $exception;
}
} finally {
echo "The end\n";
}
Expected output:
Caught CancellationException
The end
=== CancellationException propagation ===
The ''CancellationException'' affects **PHP** standard library functions differently.
If it is thrown inside one of these functions that previously did not throw exceptions,
the PHP function will terminate with an error.
In other words, the ''cancel()'' mechanism does not alter the existing function contract.
PHP standard library functions behave as if the operation had failed.
Additionally, the ''CancellationException'' will not appear in ''get_last_error()'',
but it may trigger an ''E_WARNING'' to maintain compatibility with expected behavior
for functions like ''fwrite'' (if such behavior is specified in the documentation).
==== Critical section ====
Sometimes it's necessary to execute a **critical section** of code
that must not be cancelled via ''CancellationException''.
For example, this could be a sequence of write operations or a transaction.
For this purpose, the ''Async\protect'' function is used,
which allows executing a closure in a non-cancellable (silent) mode.
function task(): void
{
Async\protect(fn() => fwrite($file, "Critical data\n"));
}
spawn(task(...));
If a ''CancellationException'' was sent to a coroutine during ''protect()'',
the exception will be thrown immediately after the execution of ''protect()'' completes.
The use of loops or unsafe operations inside a critical section can be checked by static analyzers.
==== Cancellation policy ====
This **RFC** intentionally does not define rules for tracking the execution time of cancelled coroutines.
The reason is that cancellation operations may be long-runningβfor example,
rollback strategiesβand may require blocking the function being cancelled.
Intentionally stopping coroutines that are in the cancellation state is a dangerous operation
that can lead to data loss. To avoid overcomplicating this **RFC**,
it is proposed to delegate the responsibility for such logic to the ''Scheduler'' implementation.
=== exit and die keywords ===
The ''exit''/''die'' keywords called within a coroutine result in the immediate termination of the application.
Unlike the ''cancel()'' operation, they do not allow for proper resource cleanup.
==== Graceful Shutdown ====
When an **unhandled exception** occurs in a **Coroutine**
the **Graceful Shutdown** mode is initiated.
Its goal is to safely terminate the application.
**Graceful Shutdown** cancels all coroutines in ''globalScope'',
then continues execution without restrictions, allowing the application to shut down naturally.
**Graceful Shutdown** does not prevent the creation of new coroutines or close connection descriptors.
However, if another unhandled exception is thrown during the **Graceful Shutdown** process,
the second phase is triggered.
**Second Phase of Graceful Shutdown**
- All **Event Loop descriptors** are closed.
- All **timers** are destroyed.
- Any remaining coroutines that were not yet canceled will be **forcibly canceled**.
The further shutdown logic may depend on the specific implementation of the **Scheduler** component,
which can be an external system and is beyond the scope of this **RFC**.
The **Graceful Shutdown** mode can also be triggered using the function:
Async\gracefulShutdown(?CancellationException $cancellationException = null): void {}
from anywhere in the application.
=== Deadlocks ===
A situation may arise where there are no active **Coroutines** in the execution queue
and no active handlers in the event loop.
This condition is called a **Deadlock**, and it represents a serious logical error.
When a **Deadlock** is detected, the application enters **Graceful Shutdown** mode
and generates warnings containing information about which **Coroutines** are in a waiting state
and the exact lines of code where they were suspended.
==== Tools ====
The ''Coroutine'' class implements methods for inspecting the state of a coroutine.
| Method | Description |
| **''getSpawnFileAndLine():array''** | Returns an array of two elements: the file name and the line number where the coroutine was spawned. |
| **''getSpawnLocation():string''** | Returns a string representation of the location where the coroutine was spawned, typically in the format ''"file:line"''. |
| **''getSuspendFileAndLine():array''** | Returns an array of two elements: the file name and the line number where the coroutine was last suspended. If the coroutine has not been suspended, it may return empty string,0. |
| **''getSuspendLocation():string''** | Returns a string representation of the location where the coroutine was last suspended, typically in the format ''"file:line"''. If the coroutine has not been suspended, it may return an empty string. |
| **''isSuspended():bool''** | Returns ''true'' if the coroutine has been suspended |
| **''isCancelled():bool''** | Returns ''true'' if the coroutine has been cancelled, otherwise ''false''. |
| **''getTrace():array''** | Returns the stack trace of the coroutine. |
The ''Coroutine::getAwaitingInfo()'' method returns an array with debugging information
about what the coroutine is waiting for, if it is in a waiting state.
The format of this array depends on the implementation of the **Scheduler** and the **Reactor**.
The ''Async\Scope::getChildScopes()'' method returns an array of all child scopes of the current scope.
The method ''Async\Scope::getCoroutines()'' returns a list of coroutines that are registered within the specified ''Scope''.
The ''Async\getCoroutines()'' method returns an array of all coroutines in the application.
==== Prototypes ====
* [Async functions](https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples/Async/Async.php)
* [Coroutine](https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples/Async/Coroutine.php)
* [Coroutine Context](https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples/Async/Context.php)
* [Coroutine Scope](https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples/Async/Scope.php)
* [Task Group](https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples/Async/CoroutineGroup.php)
===== Backward Incompatible Changes =====
Simultaneous use of the **True Async API** and the **Fiber API** is not possible.
- If ''new Fiber()'' is called first, the ''Async\spawn'' function will fail with an error.
- If ''Async\spawn'' is called first, any attempt to create a **Fiber** will result in an error.
===== Proposed PHP Version(s) =====
PHP 8.6+
===== RFC Impact =====
==== To SAPIs ====
The **True Async** module activates the reactor within the context of ''php_request_startup\php_request_shutdown()''
request processing. Therefore, using concurrency is reasonable only for long-life scenarios implemented via CLI.
It is expected that **True Async** will enable the integration of built-in **web servers** into PHP,
which will be embedded into the reactorβs event loop.
==== To Existing Extensions ====
No changes are expected in existing extensions.
==== To Opcache ====
Does not affect.
==== New Constants ====
No new constants are introduced.
==== php.ini Defaults ====
* async.zombie_coroutine_timeout - default 5 seconds
===== Open Issues =====
None.
===== Unaffected PHP Functionality =====
* Fiber API.
===== Future Scope =====
This **RFC** assumes the subsequent development of additional RFCs:
* Async\Context RFC - Coroutine and Scope Context Management
* RFC for Non-Blocking Versions of PHP Built-in Functions. The Socket/Curl extensions as well.
* RFC for Special Async Syntax
* RFC Context Manager (like in Python)
* Equivalent of Go's defer expression RFC.
* Special try cancellation block RFC.
* An RFC for a set of standard primitives, such as ''Future'', ''Channel'', etc.
* And possibly others...
This RFC provides for the subsequent expansion of functionality to achieve
a complete toolkit for working with concurrent logic.
It proposes development in areas:
* **Support for Pipe**
* **Development of new and revision of existing extensions**
* **Refactoring of input-output code** to improve performance and better integration with the Event Loop
* Functions for **collecting metrics**
=== Integration with Pipe ===
The Future->map()->catch()->finally() call chain is rightly criticized for excessive complexity and difficulty of comprehension.
**Pipe** (not UNIX-like-pipe) can solve this problem and create a more intuitive
and understandable interface for describing sequences of asynchronous function calls.
=== Refactoring of the Input-Output Module ===
Input-output modules such as **PHP Streams** can be redesigned with **asynchronous capabilities**
in mind and better optimized for operation in this environment.
It would also be appropriate to add support for ''pipe'' in such a way
that it can be used regardless of the operating system using ''fopen()'' functions.
This would make the API more consistent.
===== Proposed Voting Choices =====
Yes or no vote. 2/3 required to pass.
* Yes
* No
===== Patches and Tests =====
* Current implementation: https://github.com/true-async
===== References =====
Links to external references, discussions or RFCs
* **First Discussion** - https://externals.io/message/126402
* **Second Discussion** - https://externals.io/message/126537
* **Third Discussion** - https://externals.io/message/127120
* **TrueAsync API RFC** - https://github.com/true-async/php-true-async-rfc/blob/main/true-async-api-rfc.md
Additional links:
* [[https://github.com/EdmondDantes/php-true-async-rfc/blob/main/comparison.md|Comparison of concurrency models in programming languages]]
* [[https://alejandromp.com/development/blog/the-importance-of-cooperative-cancellation/|Cooperative cancellation]]
* [[https://pure.tudelft.nl/ws/portalfiles/portal/222760871/LIPIcs.ECOOP.2024.8.pdf|Understanding Concurrency Bugs in Real-World Programs with Kotlin Coroutines]]
* [[https://dl.acm.org/doi/10.1145/3297858.3304069|Understanding Real-World Concurrency Bugs in Go]]
* [[https://arxiv.org/abs/1901.03575|Static Analysis for Asynchronous JavaScript Programs]]
The following can be considered as competing solutions to the current implementation:
* **Swoole** (https://github.com/swoole/swoole-src) β a C++ library that implements a full feature set for concurrent programming. The advantage and disadvantage of **Swoole** is that it is a standalone solution that does not directly affect the language itself.
* The **Swow** (https://github.com/swow/) project is a C library that provides a good lightweight API while not affecting the language itself and not requiring changes to PHP.
* Examples of code: https://github.com/EdmondDantes/php-true-async-rfc/tree/main/examples
===== Rejected Features =====
Keep this updated with features that were discussed on the mail lists.