API reference¶
pika follows semver. pika is currently at a 0.X version which means that minor versions may break the API. pika gives no guarantees about ABI stability. The ABI may change even in patch versions.
The API reference is a work in progress. While the reference is being expanded, the
More resources section contains useful links to high level overviews and low
level API descriptions of std::execution
.
The following headers are part of the public API. Any other headers are internal implementation details.
These headers are part of the public API, but are currently undocumented.
pika/barrier.hpp
pika/condition_variable.hpp
pika/latch.hpp
pika/mpi.hpp
pika/mutex.hpp
pika/runtime.hpp
pika/semaphore.hpp
pika/thread.hpp
All functionality in a namespace containing detail
and all macros prefixed with PIKA_DETAIL
are implementation details and may change without warning at any time. All functionality in a
namespace containing experimental
may change without warning at any time. However, the intention
is to stabilize those APIs over time.
Runtime management (pika/init.hpp
)¶
The pika/init.hpp
header provides functionality to manage the pika runtime.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
// The pika runtime is now active and we can schedule work on the default
// thread pool
auto s = ex::schedule(ex::thread_pool_scheduler{}) |
ex::then([]() { fmt::print("Hello from the pika runtime\n"); });
tt::sync_wait(std::move(s));
pika::finalize();
pika::stop();
return 0;
}
-
void pika::start(int argc, char const *const *argv, init_params const ¶ms = init_params())¶
Start the runtime.
No task is created on the runtime.
-
int pika::stop()¶
Stop the runtime.
Waits until pika::finalize() has been called and there is no more activity on the runtime. See pika::wait(). The runtime can be started again after calling pika::stop(). Must be called from outside the runtime.
- Returns:¶
the return value of the callable passed to pika::start(int, char const* const*,init_params const&)”, if any. If none was passed, returns 0.
- Pre:¶
the runtime is initialized
- Pre:¶
the calling thread is not a pika task
- Post:¶
the runtime is not initialized
-
void pika::finalize()¶
Signal the runtime that it may be stopped.
Until pika::finalize() has been called, pika::stop() will not return. This function exists to distinguish between the runtime being idle but still expecting work to be scheduled on it and the runtime being idle and ready to be shutdown. Unlike pika::stop(), pika::finalize() can be called from within or outside the runtime.
- Pre:¶
the runtime is initialized
-
void pika::wait()¶
Wait for the runtime to be idle.
Waits until the runtime is idle. This includes tasks scheduled on the thread pools as well as non-tasks such as CUDA kernels submitted through pika facilities. Can be called from within the runtime, in which case the calling task is ignored when determining idleness.
-
void pika::resume()¶
Resume the runtime.
Resumes the runtime by waking all worker threads on all thread pools.
-
void pika::suspend()¶
Suspend the runtime.
Waits until the runtime is idle and suspends worker threads on all thread pools. Work can be scheduled on the runtime even when it is suspended, but no progress will be made.
-
bool pika::is_runtime_initialized() noexcept¶
Returns true when the runtime is initialized, false otherwise.
Returns true between calls of pika::start(int, char const* const*, init_params const&) and pika::stop(), otherwise false.
Added in version 0.22.0.
-
struct init_params¶
std::execution
support (pika/execution.hpp
)¶
The pika/execution.hpp
header provides functionality related to std::execution
.
std::execution
functionality, including extensions provided by pika, is defined in the
pika::execution::experimental
namespace. When the CMake option PIKA_WITH_STDEXEC
is enabled,
pika pulls the stdexec
namespace into pika::execution::experimental
.
See Relation to std::execution and stdexec and More resources for more details on how pika relates
to std::execution
and for more resources on learning about std::execution
. Documentation for
sender functionality added to the C++ standard in the above resources apply to both pika’s and
stdexec’s implementations of them.
Documented below are sender adaptors not available in stdexec or not proposed for standardization.
All sender adaptors are customization point objects (CPOs).
-
constexpr drop_value_t pika::execution::experimental::drop_value = {}¶
Ignores all values sent by the predecessor sender, sending none itself.
Sender adaptor that takes any sender and returns a new sender that sends no values.
Added in version 0.6.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <tuple>
#include <utility>
struct custom_type
{
};
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
auto s = ex::just(42, custom_type{}, std::tuple("hello")) |
ex::drop_value() |
// No matter what is sent to drop_value, it won't be sent from
// drop_value
ex::then([] { fmt::print("I got nothing...\n"); });
tt::sync_wait(std::move(s));
pika::finalize();
pika::stop();
return 0;
}
-
constexpr drop_operation_state_t pika::execution::experimental::drop_operation_state = {}¶
Releases the operation state of the adaptor before signaling a connected receiver.
Sender adaptor that takes any sender and returns a sender. Values received as references from the predecessor sender will be copied before being passed on to successor senders. Other values are passed on unchanged.
The operation state of previous senders can hold on to allocated memory or values longer than necessary which can prevent other algorithms from using those resources.
drop_operation_state
can be used to explicitly release the operation state, and thus associated resources, of previous senders.
Added in version 0.19.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <cassert>
#include <memory>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
auto sp = std::make_shared<int>(42);
std::weak_ptr<int> sp_weak = sp;
auto s = ex::just(std::move(sp)) |
ex::then([&](auto&&) { assert(sp_weak.use_count() == 1); }) |
// Even though the shared_ptr is no longer in use, it may be kept alive
// by the operation state
ex::then([&]() {
assert(sp_weak.use_count() == 1);
return 42;
}) |
ex::drop_operation_state() |
// Once drop_operation_state has been used, the shared_ptr is guaranteed
// to be released. Values are passed through the adaptor.
ex::then([&]([[maybe_unused]] int x) {
assert(sp_weak.use_count() == 0);
assert(x == 42);
});
tt::sync_wait(std::move(s));
pika::finalize();
pika::stop();
return 0;
}
-
constexpr require_started_t pika::execution::experimental::require_started = {}¶
Diagnose if a sender has not been started and terminates on destruction. It forwards the values of the predecessor sender.
Sender adaptor that takes any sender and returns a new sender that sends the same values as the predecessor sender.
The destructor terminates if the sender has not been connected or if the operation state has not been started. The operation state of a
require_started
sender is allowed to not be started if it has been explicitly requested with thediscard
member function.
Added in version 0.21.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <cassert>
#include <cstdlib>
#include <exception>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
{
// require_started forwards values received from the predecessor sender
auto s = ex::just(42) | ex::require_started() |
ex::then([]([[maybe_unused]] auto&& i) { assert(i == 42); });
tt::sync_wait(std::move(s));
}
{
// The termination is ignored with discard, the sender is from the
// user's perspective rightfully not used
auto s = ex::just() | ex::require_started();
s.discard();
}
{
// The require_started sender terminates on destruction if it has not
// been used
auto s = ex::just() | ex::require_started();
}
assert(false);
pika::finalize();
pika::stop();
return 0;
}
-
constexpr split_tuple_t pika::execution::experimental::split_tuple = {}¶
Splits a sender of a tuple into a tuple of senders.
Sender adaptor that takes a sender that sends a single, non-empty, tuple and returns a new tuple of the same size as the one sent by the input sender which contains one sender for each element in the input sender tuple. Each output sender signals completion whenever the input sender would have signalled completion. The predecessor sender must complete with exactly one tuple of at least one type.
Added in version 0.12.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <chrono>
#include <thread>
#include <tuple>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
// split_tuple can be used to process the result and its square through
// senders, without having to pass both around together
auto [snd, snd_squared] = ex::schedule(sched) |
ex::then([]() { return 42; }) |
ex::then([](int x) { return std::tuple(x, x * x); }) |
ex::split_tuple();
// snd and snd_squared will be ready at the same time, but can be used
// independently
auto snd_print = std::move(snd) | ex::continues_on(sched) |
ex::then([](int x) { fmt::print("x is {}\n", x); });
auto snd_process = std::move(snd_squared) | ex::continues_on(sched) |
ex::then([](int x_squared) {
fmt::print("Performing expensive operations on x * x\n");
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return x_squared / 2;
});
auto x_squared_processed = tt::sync_wait(
ex::when_all(std::move(snd_print), std::move(snd_process)));
fmt::print("The final result is {}\n", x_squared_processed);
pika::finalize();
pika::stop();
return 0;
}
-
constexpr unpack_t pika::execution::experimental::unpack = {}¶
Transforms a sender of tuples into a sender of the elements of the tuples.
Sender adaptor that takes a sender of a tuple-like and returns a sender where the tuple-like has been unpacked into its elements, similarly to
std::apply
. Each completion signature must send exactly one tuple-like, not zero or more than one. The predecessor sender can have any number of completion signatures for the value channel, each sending a single tuple-like. The adaptor does not unpack tuple-likes recursively. Any type that supports the tuple protocol can be used with the adaptor.
Added in version 0.17.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <string>
#include <tuple>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
auto tuple_sender = ex::just(std::tuple(std::string("hello!"), 42)) |
ex::continues_on(sched);
auto process_data = [](auto message, auto answer) {
fmt::print("{}\nthe answer is: {}\n", message, answer);
};
// With the unpack adaptor, process_data does not have to know that the data
// was originally sent as a tuple
auto unpack_sender = tuple_sender | ex::unpack() | ex::then(process_data);
// We can manually recreate the behaviour of the unpack adaptor by using
// std::apply. This is equivalent to the above.
auto apply_sender = tuple_sender | ex::then([&](auto tuple_of_data) {
return std::apply(process_data, std::move(tuple_of_data));
});
tt::sync_wait(
ex::when_all(std::move(unpack_sender), std::move(apply_sender)));
pika::finalize();
pika::stop();
return 0;
}
-
constexpr when_all_vector_t pika::execution::experimental::when_all_vector = {}¶
Returns a sender that completes when all senders in the input vector have completed.
Sender adaptor that takes a vector of senders and returns a sender that sends a vector of the values sent by the input senders. The vector sent has the same size as the input vector. An empty vector of senders completes immediately on start. When the input vector of senders contains senders that send no value the output sender sends no value instead of a vector. The senders in the input vector must send at most a single type.
Added in version 0.2.0.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <fmt/ranges.h>
#include <cstddef>
#include <random>
#include <utility>
#include <vector>
std::size_t get_n() { return 13; }
std::size_t calculate(std::size_t i) { return (std::rand() % 4) * i * i; }
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
// when_all_vector is like when_all, but for a dynamic number of senders
// through a vector of senders
auto const n = get_n();
std::vector<ex::unique_any_sender<std::size_t>> snds;
snds.reserve(n);
for (std::size_t i = 0; i < n; ++i)
{
snds.push_back(
ex::just(i) | ex::continues_on(sched) | ex::then(calculate));
}
auto snds_print = ex::when_all_vector(std::move(snds)) |
ex::then([](std::vector<std::size_t> results) {
fmt::print("Results are: {}\n", fmt::join(results, ", "));
});
tt::sync_wait(std::move(snds_print));
// when_all_vector will send no value on completion if the input vector
// contains senders sending no value
std::vector<ex::unique_any_sender<>> snds_nothing;
snds_nothing.reserve(n);
for (std::size_t i = 0; i < n; ++i)
{
snds_nothing.push_back(ex::just(i) | ex::continues_on(sched) |
ex::then([](auto i) { fmt::print("{}: {}\n", i, calculate(i)); }));
}
auto snds_nothing_done = ex::when_all_vector(std::move(snds_nothing)) |
ex::then([]() { fmt::print("Done printing all results\n"); });
tt::sync_wait(std::move(snds_nothing_done));
pika::finalize();
pika::stop();
return 0;
}
-
template<typename ...Ts>
class unique_any_sender¶ Type-erased move-only sender.
This class wraps senders that send types
Ts
in the value channel. This wrapper class does not support arbitrary completion signatures, but requires a single value and error completion signature. The value completion signature must send typesTs
. The error completion must send astd::exception_ptr
. The wrapped sender may have a stopped completion signature.The unique_any_sender requires senders that are move-constructible and connectable with r-value references to the sender. The unique_any_sender itself must also be connected with an r-value reference (i.e. moved when passing into sender adaptors or consumers).
Sending references in the completion signature is not supported.
An empty unique_any_sender throws when connected to a receiver.
Public Functions¶
-
unique_any_sender() = default¶
Default-construct an empty unique_any_sender.
-
template<typename Sender, typename = std::enable_if_t<!std::is_same_v<std::decay_t<Sender>, unique_any_sender>>>
inline unique_any_sender(Sender &&sender)¶ Construct a unique_any_sender containing
sender
.
-
template<typename Sender, typename = std::enable_if_t<!std::is_same_v<std::decay_t<Sender>, unique_any_sender>>>
inline unique_any_sender &operator=(Sender &&sender)¶ Assign
sender
to the unique_any_sender.
-
inline unique_any_sender(any_sender<Ts...> &&other)¶
Construct a unique_any_sender from an any_sender.
-
inline unique_any_sender &operator=(any_sender<Ts...> &&other)¶
Assign a any_sender to a unique_any_sender.
-
template<typename Sender>
inline void reset(Sender &&sender)¶ Assign
sender
to the unique_any_sender.
-
inline void reset()¶
Empty the unique_any_sender.
-
inline bool empty() const noexcept¶
Check if the unique_any_sender is empty.
- Returns:¶
True if the unique_any_sender is empty, i.e. default-constructed or moved-from.
-
inline explicit operator bool() const noexcept¶
Check if the unique_any_sender is non-empty.
See empty().
-
unique_any_sender() = default¶
-
template<typename ...Ts>
class any_sender¶ Type-erased copyable sender.
See unique_any_sender for an overview. Compared to unique_any_sender, the any_sender requires the wrapped senders to be l-value reference connectable and copyable. The any_sender itself is also l-value reference connectable and copyable. Otherwise it behaves the same as unique_any_sender.
A unique_any_sender can be constructed from a any_sender, but not vice-versa.
Public Functions¶
-
any_sender() = default¶
Default-construct an empty any_sender.
-
template<typename Sender, typename = std::enable_if_t<!std::is_same_v<std::decay_t<Sender>, any_sender>>>
inline any_sender(Sender &&sender)¶ Construct a any_sender containing
sender
.
-
template<typename Sender, typename = std::enable_if_t<!std::is_same_v<std::decay_t<Sender>, any_sender>>>
inline any_sender &operator=(Sender &&sender)¶ Assign
sender
to the any_sender.
-
template<typename Sender>
inline void reset(Sender &&sender)¶ Assign
sender
to the any_sender.
-
inline void reset()¶
Empty the any_sender.
-
inline bool empty() const noexcept¶
Check if the any_sender is empty.
- Returns:¶
True if the any_sender is empty, i.e. default-constructed or moved-from.
-
inline explicit operator bool() const noexcept¶
Check if the any_sender is non-empty.
See empty().
-
any_sender() = default¶
-
template<typename Sender, typename = std::enable_if_t<is_sender_v<Sender>>>
auto pika::execution::experimental::make_unique_any_sender(Sender &&sender)¶ Helper function to construct a unique_any_sender.
The template parameters for unique_any_sender are inferred from the value types sent by the given sender
sender
.
-
template<typename Sender, typename = std::enable_if_t<is_sender_v<Sender>>>
auto pika::execution::experimental::make_any_sender(Sender &&sender)¶ Helper function to construct a any_sender.
The template parameters for any_sender are inferred from the value types sent by the given sender
sender
.
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <chrono>
#include <cstddef>
#include <string_view>
#include <thread>
#include <utility>
void print_answer(std::string_view message,
pika::execution::experimental::unique_any_sender<int>&& sender)
{
auto const answer =
pika::this_thread::experimental::sync_wait(std::move(sender));
fmt::print("{}: {}\n", message, answer);
}
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
ex::unique_any_sender<int> sender;
// Whether the sender is a simple just-sender...
sender = ex::just(42);
print_answer("Quick answer", std::move(sender));
// ... or a more complicated sender, we can put them both into the same
// unique_any_sender as long as they send the same types.
sender = ex::schedule(sched) | ex::then([]() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 42;
});
print_answer("Slow answer", std::move(sender));
// If we try to use the sender again it will throw an exception
try
{
// NOLINTNEXTLINE(bugprone-use-after-move)
tt::sync_wait(std::move(sender));
}
catch (std::exception const& e)
{
fmt::print("Caught exception: {}\n", e.what());
}
// We can also use a type-erased sender to chain work. The type of the
// sender remains the same each iteration thanks to the type-erasure, but
// the work it represents grows.
//
// However, note that using a specialized algorithm like repeat_n from
// stdexec may be more efficient.
ex::unique_any_sender<int> chain{ex::just(0)};
for (std::size_t i = 0; i < 42; ++i)
{
chain = std::move(chain) | ex::continues_on(sched) |
ex::then([](int x) { return x + 1; });
}
print_answer("Final answer", std::move(chain));
pika::finalize();
pika::stop();
return 0;
}
Asynchronous read-write mutex (pika/async_rw_mutex.hpp
)¶
This header provides access to a sender-based asynchronous mutex, allowing both shared and exclusive
access to a wrapped value. The functionality is in the namespace pika::execution::experimental
.
Unlike typical mutexes, this one provides access exactly in the order that it is requested in synchronous code. This allows writing algorithms that mostly look like synchronous code, but can run asynchronously. This mutex is used extensively in DLA-Future, where it forms the basis for asynchronous access to blocks of distributed matrices.
#include <pika/async_rw_mutex.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <type_traits>
#include <utility>
int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler sched{};
{
// Below we will access the value proteced by the mutex with the
// following implied dependency graph:
//
// ┌──► ro_access1 ──┐
// rw_access1 ──┼──► ro_access2 ──┼──► rw_access2
// └──► ro_access3 ──┘
//
// Note that the senders themselves don't depend on each other
// explicitly as above, but the senders provided by the mutex enforce
// the given order. Because of this enforced order, it is possible to
// create deadlocks with the mutex. For example, starting and waiting
// for the sender of ro_access1 without ever starting the sender of
// rw_access1 would lead to a deadlock. Similarly, it is not sufficient
// to only start the last sender accessed from the mutex. It will not
// automatically start the senders of previous accesses.
ex::async_rw_mutex<int> m{0};
// This read-write access is guaranteed to not run concurrently with any
// other accesses. It will also run first since we requested the sender
// first from the mutex.
auto rw_access1 =
m.readwrite() | ex::continues_on(sched) | ex::then([](auto w) {
w.get() = 13;
fmt::print("updated value to {}\n", w.get());
});
// These read-only accesses can only read the value, but they can run
// concurrently. They'll see the write from the access above.
auto ro_access1 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("value is now {}\n", w.get());
});
auto ro_access2 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("value is {} here as well\n", w.get());
});
auto ro_access3 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("and {} here too\n", w.get());
});
// This read-write access will run once all the above read-only accesses
// are done.
auto rw_access2 =
m.readwrite() | ex::continues_on(sched) | ex::then([](auto w) {
w.get() = 42;
fmt::print("value is {} at the end\n", w.get());
});
// Start and wait for all the work to finish.
tt::sync_wait(ex::when_all(std::move(rw_access1), std::move(ro_access1),
std::move(ro_access2), std::move(ro_access3),
std::move(rw_access2)));
}
pika::finalize();
pika::stop();
return 0;
}
-
template<typename ReadWriteT, typename ReadT, typename Allocator>
class async_rw_mutex¶ Read-write mutex where access is granted to a value through senders.
The wrapped value is accessed through read and readwrite, both of which return senders which send a wrapped value on the value channel when the wrapped value is safe to read or write.
A read-write sender gives exclusive access to the wrapped value, while a read-only sender allows concurrent access to the value (with other read-only accesses).
When the wrapped type is
void
, the mutex acts as a simple mutex around some externally managed resource. The mutex still allows read-write and read-only access when the type isvoid
. The read-write wrapper types are move-only. The read-only wrapper types are copyable.The order in which senders signal a receiver is determined by the order in which the senders are retrieved from the mutex. Connecting and starting the senders is thread-safe.
The mutex is move-only.
Warning
Because access to the wrapped value is granted in the order that it is requested from the mutex, there is a risk of deadlocks if senders of later accesses are started and waited for without starting senders of earlier accesses.
Warning
Retrieving senders from the mutex is not thread-safe. The senders of the mutex are intended to be accessed in synchronous code, while the access provided by the senders themselves are safe to access concurrently.
- Template Parameters:¶
Public Types¶
-
using read_type = std::decay_t<ReadT> const¶
The type of read-only types accessed through the mutex.
-
using readwrite_type = std::decay_t<ReadWriteT>¶
The type of read-write types accessed through the mutex.
-
using read_access_type = async_rw_mutex_access_wrapper<readwrite_type, read_type, async_rw_mutex_access_type::read>¶
The wrapper type sent by read-only-access senders.
-
using readwrite_access_type = async_rw_mutex_access_wrapper<readwrite_type, read_type, async_rw_mutex_access_type::readwrite>¶
The wrapper type sent by read-write-access senders.
Public Functions¶
-
template<typename U, typename = std::enable_if_t<!std::is_same<std::decay_t<U>, async_rw_mutex>::value>>
inline explicit async_rw_mutex(U &&u, allocator_type const &alloc = {})¶ Construct a new mutex with the wrapped value initialized to
u
.
-
~async_rw_mutex() = default¶
Destroy the mutex.
The destructor does not wait or require that all accesses through senders have completed. The wrapped value is kept alive in a shared state managed by the senders, until the last access completes, or the destructor of the async_rw_mutex runs, whichever happens later.
-
inline sender<async_rw_mutex_access_type::read> read()¶
Access the wrapped value in read-only mode through a sender.
-
inline sender<async_rw_mutex_access_type::readwrite> readwrite()¶
Access the wrapped value in read-write mode through a sender.
-
enum class pika::execution::experimental::async_rw_mutex_access_type¶
The type of access provided by async_rw_mutex.
Values:
-
enumerator read¶
Read-only access.
-
enumerator readwrite¶
Read-write access.
-
enumerator read¶
-
template<typename ReadWriteT, typename ReadT, async_rw_mutex_access_type AccessType>
class async_rw_mutex_access_wrapper¶ A wrapper for values sent by senders from async_rw_mutex.
All values sent by senders accessed through async_rw_mutex are wrapped by this class. The wrapper has reference semantics to the wrapped object, and controls when subsequent accesses is given. When the destructor of the last or only wrapper runs, senders for subsequent accesses will signal their value channel.
When the access type is async_rw_mutex_access_type::readwrite the wrapper is move-only. When the access type is async_rw_mutex_access_type::read the wrapper is copyable.
-
template<typename ReadWriteT, typename ReadT>
class async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::readwrite>¶ A wrapper for values sent by senders from async_rw_mutex with read-write access.
The wrapper is move-only.
Public Functions¶
-
inline ReadWriteT &get()¶
Access the wrapped type by reference.
-
inline ReadWriteT &get()¶
-
template<typename ReadWriteT, typename ReadT>
class async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::read>¶ A wrapper for values sent by senders from async_rw_mutex with read-only access.
The wrapper is copyable.
-
template<>
class async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::read>¶ A wrapper for read-only access granted by a
void
async_rw_mutex.The wrapper is copyable.
-
template<>
class async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::readwrite>¶ A wrapper for read-write access granted by a
void
async_rw_mutex.The wrapper is move-only.
CUDA/HIP support (pika/cuda.hpp
)¶
The pika/cuda.hpp
header provides functionality related to CUDA and HIP. All functionality is
under the pika::cuda::experimental
namespace and class and function names contain cuda
, even
when HIP support is enabled. CUDA and HIP functionality can be enabled with the CMake options
PIKA_WITH_CUDA
and PIKA_WITH_HIP
, respectively, but they are mutually exclusive. In the
following, whenever CUDA is mentioned, it refers to to CUDA and HIP interchangeably.
Note
https://github.com/pika-org/pika/issues/116 tracks a potential renaming of the functionality
to avoid using cuda
even when HIP is enabled. If you have feedback on a rename or just want
to follow along, please see that issue.
Note
pika uses whip internally for portability between CUDA and HIP. However, users of pika are not forced to use whip as whip only creates aliases for CUDA/HIP types and enumerations. whip is thus compatible with directly using the types and enumerations provided by CUDA/HIP. For cuBLAS, cuSOLVER, rocBLAS, and rocSOLVER support pika does not use a portability library, but simply uses the appropriate types depending on if CUDA or HIP support is enabled.
Warning
At the moment, nvcc
can not compile stdexec headers. Of the CUDA compilers, only nvc++
is
able to compile stdexec headers. If you have stdexec support enabled in pika, either ensure that
.cu
files do not include stdexec headers, or use nvc++
to compile your application.
However, nvc++
does not officially support compiling device code. Use at your own risk.
For HIP there are no known restrictions.
The CUDA support in pika relies on four major components:
A pool of CUDA streams as well as cuBLAS and cuSOLVER handles (
pika::cuda::experimental::cuda_pool
). These streams and handles are used in a round-robin fashion by various sender adaptors.A CUDA scheduler, in the
std::execution
sense (pika::cuda::experimental::cuda_scheduler
). This uses the CUDA pool to schedule work on a GPU.Sender adaptors (
pika::cuda::experimental::then_with_stream
etc.). A few special-purpose sender adaptors, as well as customizations of a fewstd::execution
adaptors are provided to help schedule different types of work on a GPU.Polling of CUDA events integrated into the pika scheduling loop (
pika::cuda::experimental::enable_user_polling
). This integration is essential to avoid calling e.g.cudaStreamSynchronize
on a pika task, which would block the underlying worker thread and thus block progress of other work.
The following example gives an overview of using the above CUDA functionalities in pika:
#include <pika/cuda.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <whip.hpp>
#include <cstdio>
#include <utility>
__global__ void kernel()
{
printf(
"Hello from kernel! threadIdx.x: %d\n", static_cast<int>(threadIdx.x));
}
int main(int argc, char* argv[])
{
namespace cu = pika::cuda::experimental;
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler cpu_sched{};
// Create a pool of CUDA streams and cuBLAS/SOLVER handles, and a scheduler
// that uses the pool.
cu::cuda_pool pool{};
cu::cuda_scheduler cuda_sched{pool};
{
// Enable polling of CUDA events on the default pool. This is required
// to allow the adaptors below to signal completion of kernels.
cu::enable_user_polling p{};
// The work created by the adaptors below will all be scheduled on the
// same stream from the pool since the work is sequential.
//
// Note that error checking is omitted below.
auto s = ex::just(42) | ex::continues_on(cuda_sched) |
// CUDA kernel through a lambda.
ex::then([](int x) { printf("Hello from the GPU! x: %d\n", x); }) |
// Explicitly launch a CUDA kernel with a stream (see
// https://github.com/eth-cscs/whip for details about whip)
cu::then_with_stream(
[](whip::stream_t stream) { kernel<<<1, 32, 0, stream>>>(); });
tt::sync_wait(std::move(s));
}
pika::finalize();
pika::stop();
return 0;
}
While pika::cuda::experimental::cuda_pool
gives direct access to streams and handles,
the recommended way to access them is through the
pika::cuda::experimental::cuda_scheduler
and the sender adaptors available below.
-
class cuda_scheduler¶
A scheduler for running work on a CUDA pool.
Provides access to scheduling work on a CUDA context represented by a cuda_pool. Models the std::execution scheduler concept.
Move and copy constructible. The scheduler has reference semantics with respect to the associated CUDA pool.
Equality comparable.
Note
The recommended way to access streams and handles from the cuda_pool is through the sender adaptors then_with_stream, then_with_cublas, and then_with_cusolver.
Public Functions¶
-
explicit cuda_scheduler(cuda_pool pool)¶
Constructs a new cuda_scheduler using the given cuda_pool.
-
cuda_stream const &get_next_stream()¶
Return the next available CUDA stream from the pool.
-
locked_cublas_handle get_cublas_handle(cuda_stream const &stream, cublasPointerMode_t pointer_mode)¶
Return the next available cuBLAS handle from the pool.
-
locked_cusolver_handle get_cusolver_handle(cuda_stream const &stream)¶
Return the next available cuSOLVER handle from the pool.
-
explicit cuda_scheduler(cuda_pool pool)¶
-
struct then_with_stream_t¶
The type of the then_with_stream sender adaptor.
-
constexpr then_with_stream_t pika::cuda::experimental::then_with_stream = {}¶
Sender adaptor which calls
f
with CUDA stream.When the predecessor sender completes, calls
f
with a CUDA stream as the last argument after other values sent by the predecessor sender. This adaptor can only be used when the completion scheduler is a cuda_scheduler. Other work may be scheduled concurrently on the stream passed tof
. Values sent by the predecessor sender are passed as references tof
and kept alive until the work submitted byf
to the stream is completed.f
may return as soon as work has been submitted, and a connected receiver will be signaled only once the kernels submitted to the stream have completed.
#include <pika/cuda.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <whip.hpp>
#include <cstddef>
#include <cstdio>
#include <utility>
__global__ void kernel(int* p, int offset)
{
printf(
"Hello from kernel! threadIdx.x: %d\n", static_cast<int>(threadIdx.x));
p[threadIdx.x] = threadIdx.x * threadIdx.x + offset;
}
int main(int argc, char* argv[])
{
namespace cu = pika::cuda::experimental;
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler cpu_sched{};
cu::cuda_pool pool{};
cu::cuda_scheduler cuda_sched{pool};
{
cu::enable_user_polling p{};
constexpr std::size_t n = 32;
int* a = nullptr;
// whip::malloc_async wraps cudaMallocAsync/hipMallocAsync. Using the
// sender adaptors the allocation, work, and deallocation can all be
// scheduled onto the same stream.
auto s = ex::just(&a, n * sizeof(int)) | ex::continues_on(cuda_sched) |
cu::then_with_stream(whip::malloc_async) |
// The then_with_stream callable accepts values sent by the
// predecessor. They will be passed by reference before the stream.
// This allows e.g. whip::malloc_async to be used above with values
// sent by the just sender. The values are passed by reference and
// will be kept alive until the work done on the stream is done.
cu::then_with_stream(
[&a](
/* other values by reference here */ whip::stream_t
stream) {
kernel<<<1, n, 0, stream>>>(a, 17);
// Even though the function returns here, the sync_wait below
// will wait for the kernel to finish. Values returned are
// passed on to continuations.
return a;
}) |
cu::then_with_stream(whip::free_async);
tt::sync_wait(std::move(s));
}
pika::finalize();
pika::stop();
return 0;
}
-
struct then_with_cublas_t¶
The type of the then_with_cublas sender adaptor.
Public Functions¶
-
template<typename Sender, typename F>
inline constexpr auto operator()(Sender &&sender, F &&f, cublasPointerMode_t pointer_mode) const¶ Create a then_with_cublas sender.
-
template<typename F>
inline constexpr auto operator()(F &&f, cublasPointerMode_t pointer_mode) const¶ Partially bound sender. Expects a sender to be supplied later.
-
template<typename Sender, typename F>
-
constexpr then_with_cublas_t pika::cuda::experimental::then_with_cublas = {}¶
Sender adaptor which calls
f
with a cuBLAS handle.This sender is intended to be used to submit work using a cuBLAS handle. The stream associated to the handle may also be used to submit work. The handle is accessed through a locked_cublas_handle and
f
should return as quickly as possible to avoid blocking other work from using the handle.The behaviour of synchronization and lifetimes are the same as for then_with_stream, except that the handle is passed as the first argument to match the typical function signatures of cuBLAS functions.
#include <pika/cuda.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <fmt/printf.h>
#include <whip.hpp>
#include <cstddef>
#include <tuple>
#include <utility>
#if defined(PIKA_HAVE_CUDA)
# include <cublas_v2.h>
using blas_handle_t = cublasHandle_t;
auto* blas_gemm = &cublasDgemm;
auto blas_pointer_mode = CUBLAS_POINTER_MODE_HOST;
auto blas_op_n = CUBLAS_OP_N;
#elif defined(PIKA_HAVE_HIP)
# include <rocblas/rocblas.h>
using blas_handle_t = rocblas_handle;
# define CUBLAS_POINTER_MODE_HOST rocblas_pointer_mode_host
auto* blas_gemm = &rocblas_dgemm;
auto blas_pointer_mode = rocblas_pointer_mode_host;
auto blas_op_n = rocblas_operation_none;
#endif
// Owning wrapper for GPU-allocated memory.
class gpu_data
{
double* p{nullptr};
std::size_t n{0};
public:
// Note that blocking functions such as cudaMalloc will block the underlying
// operating system thread instead of yielding the pika task. Consider using
// e.g. a pool of GPU memory to avoid blocking the thread for too long.
gpu_data(std::size_t n)
: n(n)
{
whip::malloc(&p, sizeof(double) * n);
}
gpu_data(gpu_data&& other) noexcept
: p(std::exchange(other.p, nullptr))
, n(std::exchange(other.n, 0))
{
}
gpu_data& operator=(gpu_data&& other) noexcept
{
p = std::exchange(other.p, nullptr);
n = std::exchange(other.n, 0);
return *this;
}
gpu_data(gpu_data const&) = delete;
gpu_data& operator=(gpu_data const&) = delete;
~gpu_data() { whip::free(p); }
std::size_t size() const { return n; }
double* get() const { return p; }
};
__global__ void init(double* a, double* b, double* c, std::size_t n)
{
std::size_t i = blockIdx.x * blockDim.x + threadIdx.x;
if (i < n)
{
a[i] = 1.0;
b[i] = 2.0;
c[i] = 3.0;
}
}
int main(int argc, char* argv[])
{
namespace cu = pika::cuda::experimental;
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;
pika::start(argc, argv);
ex::thread_pool_scheduler cpu_sched{};
cu::cuda_pool pool{};
cu::cuda_scheduler cuda_sched{pool};
{
cu::enable_user_polling p{};
constexpr std::size_t n = 2048;
gpu_data a{n * n};
gpu_data b{n * n};
gpu_data c{n * n};
double alpha = 1.0;
double beta = 1.0;
auto s = ex::just(std::move(a), std::move(b), std::move(c)) |
ex::continues_on(cuda_sched) |
cu::then_with_stream(
[](auto& a, auto& b, auto& c, whip::stream_t stream) {
init<<<n * n / 256, 256, 0, stream>>>(
a.get(), b.get(), c.get(), n * n);
return std::make_tuple(
std::move(a), std::move(b), std::move(c));
}) |
ex::unpack() |
// a, b, and c will be kept alive by the then_with_cublas operation
// state at least until the GPU kernels complete. Values sent by
// the predecessor sender are passed as the last arguments after the
// handle.
cu::then_with_cublas(
[&](blas_handle_t handle, auto& a, auto& b, auto& c) {
blas_gemm(handle, blas_op_n, blas_op_n, n, n, n, &alpha,
a.get(), n, b.get(), n, &beta, c.get(), n);
},
blas_pointer_mode);
tt::sync_wait(std::move(s));
}
pika::finalize();
pika::stop();
return 0;
}
-
struct then_with_cusolver_t¶
The type of the then_with_cusolver sender adaptor.
-
constexpr then_with_cusolver_t pika::cuda::experimental::then_with_cusolver = {}¶
Sender adaptor which calls
f
with a cuSOLVER handle.This sender is intended to be used to submit work using a cuSOLVER handle. The stream associated to the handle may also be used to submit work. The handle is accessed through a locked_cusolver_handle and
f
should return as quickly as possible to avoid blocking other work from using the handle.The behaviour of synchronization and lifetimes are the same as for then_with_stream, except that the handle is passed as the first argument to match the typical function signatures of cuBLAS functions.
See pika::cuda::experimental::then_with_cublas
for an example of what can be done with
pika::cuda::experimental::then_with_cusolver
. The interfaces are identical except for the
type of handle passed to the callable.
-
class enable_user_polling¶
Enable CUDA polling on the given thread pool.
RAII helper class to enable and disable polling of CUDA events on the given pool. Enabling polling is a requirement to signal completion of work submitted to the cuda_scheduler.
There is no detection of whether polling is already enabled or disabled, or if enable_user_polling is nested. The constructor and destructor will unconditionally register and unregister polling, respectively.
Public Functions¶
-
inline enable_user_polling(std::string const &pool_name = "")¶
Start polling for CUDA events on the given thread pool.
-
inline ~enable_user_polling()¶
Stop polling for CUDA events.
The destructor will not wait for work submitted to a cuda_scheduler to complete. The user must ensure that work completes before disabling polling.
-
inline enable_user_polling(std::string const &pool_name = "")¶
-
class cuda_pool¶
A pool of CUDA streams, used for scheduling work on a CUDA device.
The pool initializes a set of CUDA streams on construction and provides access to the streams in a round-robin fashion. The pool also gives access to cuBLAS and cuSOLVER handles.
The pool is movable and copyable with reference semantics. Copies of a pool still refer to the original pool of streams. A moved-from pool can’t be used, except to check if it is valid with valid().
The pool is equality comparable and formattable.
Note
The recommended way to access streams and handles from the cuda_pool is through sender adaptors using cuda_scheduler.
Public Functions¶
-
explicit cuda_pool(int device = 0, std::size_t num_normal_priority_streams = 32, std::size_t num_high_priority_streams = 32, unsigned int flags = 0, std::size_t num_cublas_handles = 16, std::size_t num_cusolver_handles = 16)¶
Construct a pool of CUDA streams and handles.
Note
The default values of
num_normal_priority_streams
,num_high_priority_streams
,num_cublas_handles
, andnum_cusolver_handles
have been chosen to easily allow saturating most GPUs without creating unnecessarily many streams. In individual situations more streams (e.g. launching many small kernels) or fewer streams (e.g. the GPU does not support more concurrency, or slows down when using too many streams) may be more appropriate. Each cuBLAS and cuSOLVER handle may require a significant amount of GPU memory, which is why the default values are lower than the number of streams. The default values have proven to work well e.g. in DLA-Future.Warning
Up to and including version 0.30.X the number of streams parameters denoted the number of streams per worker thread. From 0.31.0 onwards the parameters denote the total number of streams to create in the pool. The default values were adjusted accordingly, but if you are not using the default values, please verify the values you are passing to the cuda_pool constructor are still reasonable with 0.31.0.
- Parameters:¶
- int device = 0¶
the CUDA device used for scheduling work
- std::size_t num_normal_priority_streams = 32¶
the number of normal priority streams
- std::size_t num_high_priority_streams = 32¶
the number of high priority streams
- unsigned int flags = 0¶
flags used to construct CUDA streams
- std::size_t num_cublas_handles = 16¶
the number of cuBLAS handles to create for the whole pool
- std::size_t num_cusolver_handles = 16¶
the number of cuSOLVER handles to create for the whole pool
-
bool valid() const noexcept¶
Check if the pool is valid.
- Returns:¶
true if the pool refers to a valid pool, false otherwise (e.g. if the pool has been moved out from).
-
cuda_stream const &get_next_stream(pika::execution::thread_priority priority = pika::execution::thread_priority::normal)¶
Get a reference to the next CUDA stream.
Note
The recommended way to access a stream is through a cuda_scheduler.
-
locked_cublas_handle get_cublas_handle(cuda_stream const &stream, cublasPointerMode_t pointer_mode)¶
Get a locked cuBLAS handle.
Note
The recommended way to access a handle is through a cuda_scheduler.
- Parameters:¶
- cuda_stream const &stream¶
the CUDA stream to use with the cuBLAS handle.
- Returns:¶
a locked cuBLAS handle, which is released for reuse on destruction.
-
locked_cusolver_handle get_cusolver_handle(cuda_stream const &stream)¶
Get a locked cuSOLVER handle.
Note
The recommended way to access a handle is through a cuda_scheduler.
- Parameters:¶
- cuda_stream const &stream¶
the CUDA stream to use with the cuSOLVER handle.
- Returns:¶
a locked cuSOLVER handle, which is released for reuse on destruction.
-
explicit cuda_pool(int device = 0, std::size_t num_normal_priority_streams = 32, std::size_t num_high_priority_streams = 32, unsigned int flags = 0, std::size_t num_cublas_handles = 16, std::size_t num_cusolver_handles = 16)¶
-
class cuda_stream¶
RAII wrapper for a CUDA stream.
An RAII wrapper for a CUDA stream which creates a stream on construction and destroys it on destruction. It is movable and copyable. A moved-from stream holds the default stream. A copied stream uses the properties from the given stream and creates a new stream.
Equality comparable and formattable.
When accessing the underlying stream, whip is used for compatibility with CUDA and HIP.
Note
The recommended way to access a stream is through sender adaptors using cuda_scheduler.
Public Functions¶
-
explicit cuda_stream(int device = 0, pika::execution::thread_priority priority = pika::execution::thread_priority::default_, unsigned int flags = 0)¶
Construct a new stream with the given device and priority.
- Parameters:¶
- int device = 0¶
The device to create the stream on.
- pika::execution::thread_priority priority = pika::execution::thread_priority::default_¶
The priority of the stream. The mapping from thread_priority to CUDA stream priorities is undefined, except that the order is preserved, allowing for different thread_priority to map to the same CUDA priority.
- unsigned int flags = 0¶
Flags to pass to the CUDA stream creation.
-
whip::stream_t get() const noexcept¶
Get the underlying stream.
The stream is still owned by the cuda_stream and must not be manually released.
-
int get_device() const noexcept¶
Get the device of the stream.
-
pika::execution::thread_priority get_priority() const noexcept¶
Get the priority of the stream.
-
unsigned int get_flags() const noexcept¶
brief Get the flags of the stream.
-
explicit cuda_stream(int device = 0, pika::execution::thread_priority priority = pika::execution::thread_priority::default_, unsigned int flags = 0)¶
-
class cublas_handle¶
RAII wrapper for a cuBLAS handle.
An RAII wrapper for a cuBLAS handle which creates a handle on construction and destroys it on destruction.
The wrapper is movable and copyable. A moved-from handle can not be used other than to check for validity with valid(). A copied stream uses the properties from the given handle and creates a new handle.
Equality comparable and formattable.
Note
The recommended way to access a handle is through a cuda_scheduler.
Public Functions¶
-
cublas_handle()¶
Constructs a new cuBLAS handle with the default stream.
-
explicit cublas_handle(cuda_stream const &stream)¶
Constructs a new cuBLAS handle with the given stream.
-
bool valid() const noexcept¶
Check if the handle is valid.
- Returns:¶
true if the handle refers to a valid handle, false otherwise (e.g. if the handle has been moved out from, or it has been default-constructed)
-
cublasHandle_t get() const noexcept¶
Get the underlying cuBLAS handle.
-
int get_device() const noexcept¶
Get the device associated with the stream of the cuBLAS handle.
-
whip::stream_t get_stream() const noexcept¶
Get the stream associated with the cuBLAS handle.
-
void set_stream(cuda_stream const &stream)¶
Set the stream associated with the cuBLAS handle.
-
void set_pointer_mode(cublasPointerMode_t pointer_mode)¶
Set the cuBLAS pointer mode of the handle.
-
cublas_handle()¶
-
class locked_cublas_handle¶
A locked cuBLAS handle.
A handle that provides thread-safe access to a cublas_handle. The locked handle is immovable.
Note
The recommended way to access a handle is through sender adaptors using cuda_scheduler.
Public Functions¶
-
cublas_handle const &get() noexcept¶
Access the underlying cuBLAS handle.
- Returns:¶
a reference to the cublas_handle. The returned handle is not thread-safe and must be used within the lifetime of the locked_cublas_handle.
-
cublas_handle const &get() noexcept¶
-
class cusolver_handle¶
RAII wrapper for a cuSOLVER handle.
An RAII wrapper for a cuBLAS handle which creates a handle on construction and destroys it on destruction.
The wrapper is movable and copyable. A moved-from handle can not be used other than to check for validity with valid(). A copied stream uses the properties from the given handle and creates a new handle.
Equality comparable and formattable.
Note
The recommended way to access a handle is through sender adaptors using cuda_scheduler.
Public Functions¶
-
cusolver_handle()¶
Constructs a new cuSOLVER handle with the default stream.
-
explicit cusolver_handle(cuda_stream const &stream)¶
Constructs a new cuSOLVER handle with the given stream.
-
bool valid() const noexcept¶
Check if the handle is valid.
- Returns:¶
true if the handle refers to a valid handle, false otherwise (e.g. if the handle has been moved out from, or it has been default-constructed)
-
cusolverDnHandle_t get() const noexcept¶
Get the underlying cuSOLVER handle.
-
int get_device() const noexcept¶
Get the device associated with the cuSOLVER handle.
-
whip::stream_t get_stream() const noexcept¶
Get the stream associated with the cuSOLVER handle.
-
void set_stream(cuda_stream const &stream)¶
Set the stream associated with the cuSOLVER handle.
-
cusolver_handle()¶
-
class locked_cusolver_handle¶
A locked cuSOLVER handle.
A handle that provides thread-safe access to a cusolver_handle. The locked handle is immovable.
Note
The recommended way to access a handle is through sender adaptors using cuda_scheduler.
Public Functions¶
-
cusolver_handle const &get() noexcept¶
Access the underlying cuSOLVER handle.
- Returns:¶
a reference to the cusolver_handle. The returned handle is not thread-safe and must be used within the lifetime of the locked_cusolver_handle.
-
cusolver_handle const &get() noexcept¶