diff --git a/.codespellignore b/.codespellignore new file mode 100644 index 00000000..2fddd5ed --- /dev/null +++ b/.codespellignore @@ -0,0 +1,5 @@ +cancelled +copyable +pullrequest +snd +statics diff --git a/.codespellrc b/.codespellrc index 7d233d00..31a9163a 100644 --- a/.codespellrc +++ b/.codespellrc @@ -3,4 +3,4 @@ builtin = clear,rare,en-GB_to_en-US,names,informal,code check-hidden = skip = ./.git,./build/*,./stagedir/*,./docs/html/*,./docs/latex/*,*.log,.*.swp,*~,*.bak,Makefile quiet-level = 2 -# ignore-words = .ignore-words +ignore-words = .codespellignore diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0c82ca08..e0cbb9cf 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,19 +1,15 @@ -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// For format details, see https://aka.ms/devcontainer.json. For config options, see the -// README at: https://github.com/devcontainers/templates/tree/main/src/cpp - { - "name": "Beman Project Generic Devcontainer", - "build": { - "dockerfile": "Dockerfile" - }, - "postCreateCommand": "bash .devcontainer/postcreate.sh", - "customizations": { - "vscode": { - "extensions": [ - "ms-vscode.cpptools", - "ms-vscode.cmake-tools" - ] - } - } + "name": "Beman Project Generic Devcontainer", + "build": { + "dockerfile": "Dockerfile" + }, + "postCreateCommand": "bash .devcontainer/postcreate.sh", + "customizations": { + "vscode": { + "extensions": [ + "ms-vscode.cpptools", + "ms-vscode.cmake-tools" + ] + } + } } diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 066bcefa..49fa500f 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,4 +2,3 @@ # Codeowners for reviews on PRs * @dietmarkuehl @camio @neatudarius - diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 00000000..cc129492 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,43 @@ +name: Lint Check (pre-commit) + +on: + pull_request: + push: + +jobs: + pre-commit: + runs-on: ubuntu-latest + name: pre-commit + permissions: + contents: read + checks: write + issues: write + pull-requests: write + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.13 + + - name: Get Changed Files + id: changed-files + uses: tj-actions/changed-files@v45 + + # See: + # https://github.com/tj-actions/changed-files?tab=readme-ov-file#using-local-git-directory- + - uses: pre-commit/action@v3.0.1 + with: + extra_args: --files ${{ steps.changed-files.outputs.all_changed_files }} + continue-on-error: true + + - name: suggester / pre-commit + if: ${{ github.event_name == 'pull_request' }} + uses: reviewdog/action-suggester@v1 + with: + tool_name: pre-commit + level: warning + reviewdog_flags: "-fail-level=error" diff --git a/.gitignore b/.gitignore index 9eef479b..df999a8e 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,4 @@ CMakeCache.txt CMakeFiles/ docs/html -docs/latex \ No newline at end of file +docs/latex diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 00000000..81f5fcd7 --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,9 @@ +# MD033/no-inline-html : Inline HTML : https://github.com/DavidAnson/markdownlint/blob/v0.35.0/doc/md033.md +# Disable inline html linter is needed for
+MD033: false + +# MD013/line-length : Line length : https://github.com/DavidAnson/markdownlint/blob/v0.35.0/doc/md013.md +# Conforms to .clang-format ColumnLimit +# Update the comment in .clang-format if we no-longer tie these two column limits. +MD013: + line_length: 119 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..bffa58ed --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,42 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-json + - id: check-yaml + exclude: ^\.clang-(format|tidy)$ + - id: check-added-large-files + + # This brings in a portable version of clang-format. + # See also: https://github.com/ssciwr/clang-format-wheel + - repo: https://github.com/pre-commit/mirrors-clang-format + rev: v19.1.4 + hooks: + - id: clang-format + types_or: [c++, c, json] + exclude: docs/TODO.json + + # TODO: CMake linting and formatting + # - repo: https://github.com/BlankSpruce/gersemi + # rev: 0.17.1 + # hooks: + # - id: gersemi + # name: CMake linting + + # TODO: Markdown linting + # Config file: .markdownlint.yaml + # - repo: https://github.com/igorshubovych/markdownlint-cli + # rev: v0.43.0 + # hooks: + # - id: markdownlint + + - repo: https://github.com/codespell-project/codespell + rev: v2.3.0 + hooks: + - id: codespell + files: ^.*\.(cmake|cpp|hpp|txt|md|json|in|yaml|yml)$ + args: ["--ignore-words", ".codespellignore" ] diff --git a/CMakeLists.txt b/CMakeLists.txt index c6874da7..bafae52b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,17 +12,35 @@ if(${CMAKE_SOURCE_DIR} STREQUAL ${CMAKE_BINARY_DIR}) endif() set(TARGET_NAME execution26) -set(TARGET_NAMESPACE beman) # FIXME: not used in install(EXPORT ...) CK? +set(TARGET_NAMESPACE beman) set(TARGET_PREFIX ${TARGET_NAMESPACE}.${TARGET_NAME}) set(TARGET_LIBRARY ${PROJECT_NAME}) -set(TARGET_ALIAS ${TARGET_LIBRARY}::${TARGET_LIBRARY}) +set(TARGET_ALIAS ${TARGET_NAMESPACE}::${TARGET_NAME}) set(TARGET_PACKAGE_NAME ${PROJECT_NAME}-config) set(TARGETS_EXPORT_NAME ${PROJECT_NAME}-targets) +option( + BEMAN_EXECUTION26_ENABLE_TESTING + "Enable building tests and test infrastructure. Values: { ON, OFF }." + ${PROJECT_IS_TOP_LEVEL} +) + +option( + BEMAN_EXECUTION26_BUILD_EXAMPLES + "Enable building examples. Values: { ON, OFF }." + ${PROJECT_IS_TOP_LEVEL} +) + +option( + BEMAN_EXECUTION26_ENABLE_INSTALL + "Install the project components. Values: { ON, OFF }." + ${PROJECT_IS_TOP_LEVEL} +) + include(GNUInstallDirs) set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}) -if(CMAKE_BUILD_TYPE STREQUAL Debug) +if(NOT BEMAN_EXECUTION26_ENABLE_INSTALL OR CMAKE_SKIP_INSTALL_RULES) include(FetchContent) # Add project_options from https://github.com/aminya/project_options @@ -40,7 +58,7 @@ if(CMAKE_BUILD_TYPE STREQUAL Debug) # uncomment to enable the options. Some of them accept one or more inputs: project_options( PREFIX - ${PROJECT_NAME} + ${TARGET_NAME} ENABLE_CACHE # NO! # ENABLE_CLANG_TIDY # NO! ENABLE_VS_ANALYSIS @@ -73,13 +91,20 @@ endif() add_subdirectory(src/beman/execution26) -if(PROJECT_IS_TOP_LEVEL) +if(BEMAN_EXECUTION26_ENABLE_TESTING) enable_testing() add_subdirectory(tests/beman/execution26) +endif() + +if(BEMAN_EXECUTION26_BUILD_EXAMPLES) add_subdirectory(examples) endif() +if(NOT BEMAN_EXECUTION26_ENABLE_INSTALL OR CMAKE_SKIP_INSTALL_RULES) + return() +endif() + include(CMakePackageConfigHelpers) write_basic_package_version_file( diff --git a/Makefile b/Makefile index 02a714fa..d8949263 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ SANITIZERS := run # SANITIZERS += asan # TODO: tsan msan # endif -.PHONY: default release debug doc run update check ce todo distclean clean codespell clang-tidy build test install all format $(SANITIZERS) +.PHONY: default release debug doc run update check ce todo distclean clean codespell clang-tidy build test install all format unstage $(SANITIZERS) SYSROOT ?= TOOLCHAIN ?= @@ -79,7 +79,9 @@ doc: build: CC=$(CXX) cmake --fresh -G Ninja -S $(SOURCEDIR) -B $(BUILD) $(TOOLCHAIN) $(SYSROOT) \ - -DCMAKE_CXX_COMPILER=$(CXX) # XXX -DCMAKE_CXX_FLAGS="$(CXX_FLAGS) $(SAN_FLAGS)" + -D CMAKE_EXPORT_COMPILE_COMMANDS=1 \ + -D CMAKE_SKIP_INSTALL_RULES=1 \ + -D CMAKE_CXX_COMPILER=$(CXX) # XXX -D CMAKE_CXX_FLAGS="$(CXX_FLAGS) $(SAN_FLAGS)" cmake --build $(BUILD) # NOTE: without install! CK @@ -111,8 +113,8 @@ check: done | tsort > /dev/null build/$(SANITIZER)/compile_commands.json: $(SANITIZER) -clang-tidy: build/$(SANITIZER)/compile_commands.json - run-clang-tidy -p build/$(SANITIZER) tests examples +clang-tidy: $(BUILD)/compile_commands.json + run-clang-tidy -p $(BUILD) tests examples codespell: codespell -L statics,snd,copyable,cancelled @@ -128,6 +130,9 @@ clang-format: todo: bin/mk-todo.py +unstage: + git restore --staged tests/beman/execution26/CMakeLists.txt + .PHONY: clean-doc clean-doc: $(RM) -r docs/html docs/latex diff --git a/README.md b/README.md index 037dc173..510241c9 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception --> # beman.execution26: Building Block For Asynchronous Programs + + `beman.execution26` provides the basic vocabulary for asynchronous programming as well as important algorithms implemented in terms of this vocabulary. The key entities of the vocabulary are: @@ -33,7 +35,9 @@ e.g.: completed. - `bulk(...)` to executed execute work, potentially concurrently. -**Implements:** [`std::execution` (P2300)](http://wg21.link/p2300). +**Implements:** [`std::execution` (P2300R10)](http://wg21.link/P2300R10). + +**Status**: [Under development and not yet ready for production use.](https://github.com/bemanproject/beman/blob/main/docs/BEMAN_LIBRARY_MATURITY_MODEL.md#under-development-and-not-yet-ready-for-production-use) ## Help Welcome! diff --git a/bin/update-cmake-headers.py b/bin/update-cmake-headers.py index 69e0095a..7c53b0ce 100755 --- a/bin/update-cmake-headers.py +++ b/bin/update-cmake-headers.py @@ -29,7 +29,7 @@ def get_headers(dir): } file_set_re = re.compile(" *FILE_SET.*") -section_re = re.compile(" *\${TARGET_LIBRARY}_(?P
.*)_headers$") +section_re = re.compile(" *\${TARGET_NAME}_(?P
.*)_headers$") header_re = re.compile(" *\${PROJECT_SOURCE_DIR}/include/beman/.*/.*\.hpp") if len(sys.argv) != 2: diff --git a/cmake/CMakeLinuxPresets.json b/cmake/CMakeLinuxPresets.json index beb1c487..5deaebeb 100644 --- a/cmake/CMakeLinuxPresets.json +++ b/cmake/CMakeLinuxPresets.json @@ -20,8 +20,7 @@ "name": "release-base-Linux", "hidden": true, "cacheVariables": { - "CMAKE_BUILD_TYPE": "RelWithDebInfo", - "CMAKE_CXX_FLAGS": "-Wall -Wextra -Wpedantic -Wno-shadow -Wconversion -Wsign-conversion -Wcast-align -Wcast-qual -Woverloaded-virtual -Wformat=2 -Wno-error" + "CMAKE_BUILD_TYPE": "RelWithDebInfo" }, "condition": { "type": "notEquals", diff --git a/docs/TODO.md b/docs/TODO.md index 5ce79912..a161b0d6 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -1,4 +1,4 @@ -# ToDo +# ToDo | Section | Code | Test | Doc | Comment | | ------- |:----:|:----:|:---:| ------- | diff --git a/docs/dependency.txt b/docs/dependency.txt index e6fea924..b88965b1 100644 --- a/docs/dependency.txt +++ b/docs/dependency.txt @@ -515,4 +515,4 @@ when_all_with_variant into_variant when_all_with_variant when_all with_awaitable_senders as_awaitable write_env make_sender -write_env queryable \ No newline at end of file +write_env queryable diff --git a/docs/intro-examples.md b/docs/intro-examples.md new file mode 100644 index 00000000..adb23bcf --- /dev/null +++ b/docs/intro-examples.md @@ -0,0 +1,130 @@ + + +# Introduction by Example + +This page provides a series of examples showing how to use the +`std::execution` components. + +
+`"Hello, world"` - synchronous using asynchronous components + +Code: [`examples/intro-1-hello-world.cpp`]() + +The first example is a very complicated way to a version of `"Hello, +world"`: it uses components for dealing with asynchronous work to +synchronously produce the result. The intention is to show a basic +use of some involved components to build up a feeling of how things +work. + +The componentes for `std::execution` are declared in the header +``. This particular implementation implements the +cmponents in namespace `beman::execution26` declared in the header +``: + +```c++ +#include +#include +#include +#include + +namespace ex = ::beman::execution26; +using namespace std::string_literals; +``` + +Most of these declarations should be familiar. The namespace alias +`ex` is used to support easy migration to a different implementation, +in particular the standard name `std::execution` once it becomes +available with standard library implementations. The other examples +will have a similar start which is only mentioned in the explanation +to point out unusual parts like the use of custom components. + +All interesting work happens in the `main` function: + +```c++ +int main() +{ + auto[result] = ex::sync_wait( + ex::when_all( + ex::just("hello, "s), + ex::just("world"s) + ) + | ex::then([](auto s1, auto s2){ return s1 + s2; }) + ).value_or(std::tuple(""s)); + + std::cout << result << '\n'; +} +``` + +This code code be simpler even when using components from +`std::execution`. Showing a few more components is intended to +better reflect how an asynchronous program might look like. This +examples uses a _sender factory_ (`ex::just`), two _sender adaptors_ +(`ex::when_all` and `ex::then`), and finally a _sender consumer_ +(`ex::sync_wait`) to build up work and to execute it. The idea of +a _sender_ is that it represents work which can be composed with +algorithms into a unit of work which is eventually executed. + +Each work item can complete asynchronously at some later time, i.e., +calling it like a function and using a returned value isn't really +an option. Instead, when the work is started it does whatever is +needed to get the work completed and get a _completion signal_ +delivered. Delivering a completion signal consists of calling a +function on a suitable objects. The important part is that once +work is started it always delivers exactly one completion signal +which can indicate success, failure, or cancellation. Later examples +for creating senders will go into more details about the cancellation +signals. + +The components used in this example do all of that synchronously: + +- `ex::just("string"s)` completes immediately when started with + successful completion which includes the string passed as + argument. +- ex::when_all(_sender1_, _sender2_) starts the senders + passed as arguments. When all of the senders complete, it + produces its own completion. In the case of success all the + received values are passed to the completion signal. In case + of an error all outstanding work is cancelled and the first + error becomes `when_all`'s completion signal once all children + have completed. Similarly, in case of cancellation all children + get cancelled and once all complete `when_all` produces a + cancellation signal. In the example the two children each produces + one string as completion signal and `when_all` produces these two + strings as its completion signal. +- _sender_ | ex::then(_fun_) is equivalent to using + ex::then(_sender_, _fun_). The `ex::then` calls + the function _fun_ with its child sender completes + successful. The arguments to _fun_ are the values + received from the child completion signal. In the example, the + child is `when_all(...)` and it produces two strings which are + passed to _fun_. The completion signal of `ex::then` + is successful with the value returned from the call to + _fun_ (which may `void`) if the call returns + normally. If an exception is thrown `ex::then` completes with + an `std::exception_ptr` to the exception thrown. In the example + the completion is just a concatenation of the two strings. +- sync_wait(_sender_) starts its argument and then + blocks until the work completes although the thread calling + `sync_wait` may contribute to the completion of the work. The + function returns a an + std::optional<std::tuple<_results_...>>>. + If the child sender completes successfully the values from the + child's completion signal become the elements of the tuple. If + the child completes with an error, the error is thrown as an + exception. Otherwise, if the work gets cancelled, an empty + `std::optional<...>` is returned. In the example, the child + sends a string which gets wrapped into a `std::tuple` which in + turn gets wrapped into an `std::optional`. Thus, the somewhat + round-about way to get the result: first using + `value_or(std::tuple(""s))` to get the value from the `std::optional` + which is then decomposed from the `std::tuple` using structured + bindings. + +
+ +
+`"Hello, async"` - a simple asynchronous example + +Code: [`examples/intro-2-hello-async.cpp`]() + +
diff --git a/docs/overview.md b/docs/overview.md index 99d4384d..67fd7c07 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -20,7 +20,7 @@ When an asynchronous operation completes it _signals_ its completion by calling
environment The term _enviroment_ refers to the bag of properties associated with an _object_ by the call std::execution::get_env(_object_). By default the environment for objects is empty (std::execution::empty_env). In particular, environments associated with receivers are used to provide access to properties like the stop token, scheduler, or allocator associated with the receiver. The various properties associated with an object are accessed via queries. -
+
## Concepts This section lists the concepts from `std::execution`. @@ -46,7 +46,7 @@ struct example_state { using operation_state_concept = std::execution::operation_state_t; std::remove_cvref_t receiver; - + auto start() & noexcept { std::execution::set_value(std::move(this->receiver)); } @@ -54,7 +54,7 @@ struct example_state static_assert(std::execution::operation_state>); ``` - +
receiver<Receiver> @@ -90,7 +90,7 @@ struct example_receiver { using receiver_concept = std::execution::receiver_t; std::remove_cvref_t nested; - + auto get_env() const noexcept { return std::execution::get_env(this->nested); } @@ -112,7 +112,7 @@ struct example_receiver static_assert(std::execution::receiver>); ``` -
+
receiver_of<Receiver, Completions> @@ -129,7 +129,7 @@ The example defines a simple receiver a struct example_receiver { using receiver_concept = std::execution::receiver_t; - + auto set_value(int) && noexcept ->void {} auto set_stopped() && noexcept ->void {} }; @@ -141,7 +141,7 @@ static_assert(std::execution::receiver_of); -// providing a superset of signal models models receiver_of: +// providing a superset of signal models models receiver_of: static_assert(std::execution::receiver_of; - + int value{}; template auto connect(Receiver&& receiver) const -> state { @@ -238,7 +238,7 @@ To determine if _Receiver_ can receive all sends_stopped<Sender, Env = std::execution::empty_env> -The concept sends_stopped<Sender, Env> determines if _Sender_ may send a stopped completion signal. To do so, the concepts determines if std::execution::get_completion_signals(_sender_, _env_) contains the signatures std::execution::set_stopped_t(). +The concept sends_stopped<Sender, Env> determines if _Sender_ may send a stopped completion signal. To do so, the concepts determines if std::execution::get_completion_signals(_sender_, _env_) contains the signatures std::execution::set_stopped_t().
stoppable_token<_Token_> @@ -284,7 +284,7 @@ void compute(std::stoppable_token auto token) Example: inactive
This example shows how an operation_state can use the callback_type together with a _token_ to get notified when cancellation is requested. - + ```c++ template struct example_state @@ -328,7 +328,7 @@ struct example_state std::execution::set_value(std::move(this->receiver)); } } -}; +}; ```
diff --git a/docs/questions.md b/docs/questions.md index fd070514..7e120b04 100644 --- a/docs/questions.md +++ b/docs/questions.md @@ -49,4 +49,4 @@ likely observable. - [exec.when.all] uses on-stop-request without saying what it actually does. most likely on-stop-request is supposed to call stop_src.request_stop - [exec.when.all] p11.1 uses a non-existing conversion for tuple to implement its "tie" - actually, that one may be new in C++23! \ No newline at end of file + actually, that one may be new in C++23! diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3a64e258..7d03f522 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -12,6 +12,9 @@ list( stop_token stopping allocator + intro-1-hello-world + intro-2-hello-async + intro-5-consumer doc-just doc-just_error doc-just_stopped @@ -21,5 +24,5 @@ foreach(EXAMPLE ${EXAMPLES}) set(EXAMPLE_TARGET ${TARGET_PREFIX}.examples.${EXAMPLE}) add_executable(${EXAMPLE_TARGET}) target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp) - target_link_libraries(${EXAMPLE_TARGET} PRIVATE ${TARGET_LIBRARY}) + target_link_libraries(${EXAMPLE_TARGET} PRIVATE ${TARGET_NAMESPACE}::${TARGET_NAME}) endforeach() diff --git a/examples/allocator.cpp b/examples/allocator.cpp index 483521c9..cd4655e7 100644 --- a/examples/allocator.cpp +++ b/examples/allocator.cpp @@ -12,7 +12,7 @@ namespace { template struct inline_resource : std::pmr::memory_resource { const char* name; - explicit inline_resource(const char* name) : name(name) {} + explicit inline_resource(const char* n) : name(n) {} std::byte buffer[Size]{}; // NOLINT(hicpp-avoid-c-arrays) std::byte* next{+this->buffer}; // NOLINT(hicpp-no-array-decay) @@ -39,13 +39,13 @@ struct allocator_aware_fun { template requires std::same_as, std::remove_cvref_t> - explicit allocator_aware_fun(F&& fun) : fun(std::forward(fun)) {} - allocator_aware_fun(const allocator_aware_fun& other, allocator_type allocator = {}) - : fun(other.fun), allocator(allocator) {} + explicit allocator_aware_fun(F&& f) : fun(std::forward(f)) {} + allocator_aware_fun(const allocator_aware_fun& other, allocator_type alloc = {}) + : fun(other.fun), allocator(alloc) {} allocator_aware_fun(allocator_aware_fun&& other) noexcept : fun(std::move(other.fun)), allocator(other.allocator) {} - allocator_aware_fun(allocator_aware_fun&& other, allocator_type allocator) - : fun(std::move(other.fun)), allocator(allocator) {} + allocator_aware_fun(allocator_aware_fun&& other, allocator_type alloc) + : fun(std::move(other.fun)), allocator(alloc) {} template auto operator()(Args&&... args) noexcept { diff --git a/examples/doc-just.cpp b/examples/doc-just.cpp index f62298ab..fc5f5aba 100644 --- a/examples/doc-just.cpp +++ b/examples/doc-just.cpp @@ -11,4 +11,4 @@ int main() { auto result = ex::sync_wait(ex::just(17, "hello"s, true)); assert(result); assert(*result == std::tuple(17, "hello"s, true)); -} \ No newline at end of file +} diff --git a/examples/doc-just_error.cpp b/examples/doc-just_error.cpp index 54771202..24e75b6c 100644 --- a/examples/doc-just_error.cpp +++ b/examples/doc-just_error.cpp @@ -6,12 +6,19 @@ #include namespace ex = beman::execution26; +namespace { +void use(auto&&...) {} +} // namespace + int main() { bool had_error{false}; auto result = ex::sync_wait(ex::just_error(std::error_code(17, std::system_category())) | ex::upon_error([&](std::error_code ec) { + use(ec); assert(ec.value() == 17); had_error = true; })); + use(result, had_error); + assert(result); assert(had_error); -} \ No newline at end of file +} diff --git a/examples/doc-just_stopped.cpp b/examples/doc-just_stopped.cpp index a7cc8a77..5f48cb2d 100644 --- a/examples/doc-just_stopped.cpp +++ b/examples/doc-just_stopped.cpp @@ -7,9 +7,15 @@ #include //-dk:TODO remove namespace ex = beman::execution26; +namespace { +void use(auto&&...) {} +} // namespace + int main() { bool stopped{false}; auto result = ex::sync_wait(ex::just_stopped() | ex::upon_stopped([&] { stopped = true; })); + use(result, stopped); + assert(result); assert(stopped); -} \ No newline at end of file +} diff --git a/examples/intro-1-hello-world.cpp b/examples/intro-1-hello-world.cpp new file mode 100644 index 00000000..82233e45 --- /dev/null +++ b/examples/intro-1-hello-world.cpp @@ -0,0 +1,27 @@ +// examples/intro-1-hello-world.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include + +namespace ex = ::beman::execution26; +using namespace std::string_literals; + +// ---------------------------------------------------------------------------- +// Please see the explanation in docs/intro-examples.md for an explanation. + +int main() { + // clang-format off + auto [result] = ex::sync_wait( + ex::when_all( + ex::just("hello, "s), + ex::just("world"s) + ) | ex::then([](auto s1, auto s2) { return s1 + s2; }) + ).value_or(std::tuple(""s) + ); + // clang-format on + + std::cout << result << '\n'; +} diff --git a/examples/intro-2-hello-async.cpp b/examples/intro-2-hello-async.cpp new file mode 100644 index 00000000..f9621bb9 --- /dev/null +++ b/examples/intro-2-hello-async.cpp @@ -0,0 +1,39 @@ +// examples/intro-2-hello-async.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include "intro-timer.hpp" +#include +#include +#include +#include + +namespace ex = ::beman::execution26; +using namespace std::string_literals; +using namespace std::chrono_literals; + +// ---------------------------------------------------------------------------- +// Please see the explanation in docs/intro-examples.md for an explanation. + +int main() { + std::cout << std::unitbuf; + intro::timer timer; + + // clang-format off + auto [result] = ex::sync_wait( + ex::when_all( + timer.run(), + ex::when_all( + timer.resume_after(3s) + | ex::then([] { std::cout << "h\n"; return std::string("hello"); }), + timer.resume_after(1s) + | ex::then([] { std::cout << ",\n"; return std::string(", "); }), + timer.resume_after(2s) + | ex::then([] { std::cout << "w\n"; return std::string("world"); }) + ) | ex::then([](auto s1, auto s2, auto s3) { return s1 + s2 + s3; }) + ) + ).value_or(std::tuple(std::string(""))); + // clang-format on + + std::cout << result << "\n"; +} diff --git a/examples/intro-5-consumer.cpp b/examples/intro-5-consumer.cpp new file mode 100644 index 00000000..2068b1f8 --- /dev/null +++ b/examples/intro-5-consumer.cpp @@ -0,0 +1,93 @@ +// examples/intro-1-hello-world.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include + +namespace ex = ::beman::execution26; +using namespace std::string_literals; + +enum class success { one }; +enum class failure { fail_one }; + +struct expected_to_channel_t { + template + struct own_receiver { + using receiver_concept = ex::receiver_t; + Receiver* receiver; + template + auto set_value(std::expected&& exp) noexcept -> void { + if (exp) { + std::cout << "received an expected with value from child/upstream\n" << std::flush; + ex::set_value(std::move(*receiver), exp.value_or(Value{})); + std::cout << "set_value\n" << std::flush; + } else { + std::cout << "received an expected with error from child/upstream\n"; + ex::set_error(std::move(*receiver), exp.error_or(Error{})); + } + } + template + auto set_error(Error&& error) noexcept -> void { + std::cout << "received an error from child/upstream"; + ex::set_error(std::move(*receiver), std::forward(error)); + } + auto set_stopped() noexcept -> void { + std::cout << "received an cancelletion from child/upstream"; + ex::set_stopped(std::move(*receiver)); + } + }; + template + struct state { + using operation_state_concept = ex::operation_state_t; + using child_state_t = decltype(ex::connect(std::declval(), std::declval>())); + Receiver parent_receiver; + child_state_t child_state; + template + state(S&& child_sender, R&& parent_receiver) + : parent_receiver(std::forward(parent_receiver)), + child_state(ex::connect(std::forward(child_sender), own_receiver{&this->parent_receiver})) { + } + void start() & noexcept { + std::cout << "starting execpted_to_channel\n"; + ex::start(child_state); + } + }; + + template + struct sender { + using sender_concept = ex::sender_t; + // value_types_of -> set_value_t(std::expected) + // -> completion_signatures + // -> + error_type_of + // -> + sends_stopped -> set_stopped_t() + // -> unique + using completion_signatures = ex::completion_signatures; + CSender child_sender; + + template + state> connect(Receiver&& receiver) && { + return {std::move(this->child_sender), std::forward(receiver)}; + } + template + state> connect(Receiver&& receiver) const& { + return {this->child_sender, std::forward(receiver)}; + } + }; + + template + sender> operator()(CSender&& child_sender) const { + return {std::forward(child_sender)}; + } + auto operator()() const { return ex::detail::sender_adaptor{*this}; } +}; +inline constexpr expected_to_channel_t expected_to_channel{}; + +int main() { + ex::sync_wait(ex::just(std::expected(success::one)) | expected_to_channel() | + ex::then([](success) noexcept { std::cout << "success\n"; }) | + ex::upon_error([](failure) noexcept { std::cout << "fail\n"; })); +} diff --git a/examples/intro-timer.hpp b/examples/intro-timer.hpp new file mode 100644 index 00000000..b8d1b35a --- /dev/null +++ b/examples/intro-timer.hpp @@ -0,0 +1,134 @@ +// examples/intro-timer.hpp -*-C++-*- +// ---------------------------------------------------------------------------- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// ---------------------------------------------------------------------------- + +#ifndef INCLUDED_EXAMPLES_INTRO_TIMER +#define INCLUDED_EXAMPLES_INTRO_TIMER + +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace intro { +namespace ex = beman::execution26; +struct timer; +} // namespace intro + +// ---------------------------------------------------------------------------- + +struct intro::timer { + struct state_base { + virtual ~state_base() = default; + virtual void complete() = 0; + }; + template + struct state : state_base { + using operation_state_concept = ex::operation_state_t; + timer* self; + std::remove_cvref_t receiver; + std::chrono::milliseconds ms; + + template + state(timer* self, R&& receiver, std::chrono::milliseconds ms) + : state_base(), self(self), receiver(std::forward(receiver)), ms(ms) {} + void start() & noexcept { self->add(ms, this); } + void complete() override { ex::set_value(std::move(receiver)); } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + timer* self; + std::chrono::milliseconds ms; + template + state connect(R&& r) { + return state(self, std::forward(r), ms); + } + }; + using time_point = std::chrono::system_clock::time_point; + using value_type = std::tuple; + std::priority_queue, std::greater<>> outstanding; + + void add(std::chrono::milliseconds ms, state_base* base) { + outstanding.emplace(std::chrono::system_clock::now() + ms, base); + } + bool run_one() { + if (outstanding.empty()) + return false; + auto [time, base] = outstanding.top(); + outstanding.pop(); + auto now{std::chrono::system_clock::now()}; + if (now < time) { + std::this_thread::sleep_for(time - now); + } + base->complete(); + return true; + } + + template + struct run_state { + struct recv { + using receiver_concept = ex::receiver_t; + run_state* self; + + auto set_value(auto&&...) noexcept -> void { this->self->run_one(); } + auto set_error(auto&&) noexcept -> void { this->self->run_one(); } + auto set_stopped() noexcept -> void { this->self->run_one(); } + }; + using operation_state_concept = ex::operation_state_t; + using scheduler_t = decltype(ex::get_delegation_scheduler(ex::get_env(std::declval()))); + static_assert(ex::receiver); + static_assert(ex::scheduler); + static_assert(ex::sender()))>); + using state_t = decltype(ex::connect(ex::schedule(std::declval()), std::declval())); + struct state_ctor { + state_t state; + template + state_ctor(S&& sender, R&& receiver) + : state(ex::connect(std::forward(sender), std::forward(receiver))) {} + }; + + timer* self; + Receiver receiver; + std::optional state{}; + + auto schedule_one() { + this->state.emplace(ex::schedule(ex::get_delegation_scheduler(ex::get_env(this->receiver))), recv{this}); + ex::start(this->state->state); + } + auto run_one() { + this->state.reset(); + if (this->self->run_one()) + this->schedule_one(); + else + ex::set_value(std::move(this->receiver)); + } + auto start() & noexcept -> void { this->schedule_one(); } + }; + struct run_sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + + timer* self; + + template + run_state> connect(Receiver&& receiver) { + return {self, std::forward(receiver)}; + } + }; + + auto run() { return run_sender{this}; } + + template + auto resume_after(std::chrono::duration d) { + auto ms(std::chrono::duration_cast(d)); + return sender{this, ms}; + } +}; + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/playground.cpp b/examples/playground.cpp index eb8a343b..472d57a1 100644 --- a/examples/playground.cpp +++ b/examples/playground.cpp @@ -10,10 +10,9 @@ namespace ex = ::beman::execution26; // ---------------------------------------------------------------------------- -int main() -{ +int main() { auto [result] = ex::sync_wait(ex::when_all(ex::just(std::string("hello, ")), ex::just(std::string("world"))) | ex::then([](const auto& s1, const auto& s2) { return s1 + s2; })) .value_or(std::tuple(std::string("oops"))); std::cout << "result='" << result << "'\n"; -} \ No newline at end of file +} diff --git a/examples/sender-demo.cpp b/examples/sender-demo.cpp index a2894aa0..384c71aa 100644 --- a/examples/sender-demo.cpp +++ b/examples/sender-demo.cpp @@ -14,8 +14,8 @@ struct just_op_state { std::pmr::string value; template - just_op_state(R&& r, std::pmr::string&& value) - : rec(std::forward(r)), value(std::move(value), ex::get_allocator(ex::get_env(rec))) {} + just_op_state(R&& r, std::pmr::string&& val) + : rec(std::forward(r)), value(std::move(val), ex::get_allocator(ex::get_env(rec))) {} void start() & noexcept { ex::set_value(std::move(rec), std::move(value)); } }; @@ -46,18 +46,22 @@ static_assert(ex::sender>); static_assert(ex::sender_in>); int main() { - auto j = just_sender{std::pmr::string("value")}; - auto t = std::move(j) | ex::then([](const std::pmr::string& v) { return v + " then"; }); - auto w = ex::when_all(std::move(t)); - auto e = ex::detail::write_env(std::move(w), - ex::detail::make_env(ex::get_allocator, std::pmr::polymorphic_allocator<>())); - - std::cout << "before start\n"; - auto r = ex::sync_wait(std::move(e)); - if (r) { - auto [v] = *r; - std::cout << "produced='" << v << "'\n"; - } else - std::cout << "operation was cancelled\n"; - std::cout << "after start\n"; + try { + auto j = just_sender{std::pmr::string("value")}; + auto t = std::move(j) | ex::then([](const std::pmr::string& v) { return v + " then"; }); + auto w = ex::when_all(std::move(t)); + auto e = ex::detail::write_env(std::move(w), + ex::detail::make_env(ex::get_allocator, std::pmr::polymorphic_allocator<>())); + + std::cout << "before start\n"; + auto r = ex::sync_wait(std::move(e)); + if (r) { + auto [v] = *r; + std::cout << "produced='" << v << "'\n"; + } else + std::cout << "operation was cancelled\n"; + std::cout << "after start\n"; + } catch (const std::exception& ex) { + std::cout << "ERROR: " << ex.what() << "\n"; + } } diff --git a/examples/stop_token.cpp b/examples/stop_token.cpp index aa74aa41..9100e284 100644 --- a/examples/stop_token.cpp +++ b/examples/stop_token.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -46,8 +47,8 @@ namespace exec = beman::execution26; // - std::print isn't available everywhere, yet. Let's try a simple // placeholder. namespace { -::std::mutex io_lock; -void print(std::string_view text, auto&&...) { +::std::mutex io_lock; +void print(std::string_view text, auto&&...) { const std::lock_guard guard(io_lock); ::std::cout << text; } @@ -71,7 +72,7 @@ struct stop_callback_for_t { #ifdef __cpp_lib_latch template auto inactive(const Token& token) -> void { - ::std::latch latch(1); + ::std::latch latch(1); const stop_callback_for_t cb(token, [&latch] { latch.count_down(); }); latch.wait(); @@ -92,15 +93,20 @@ auto inactive(Token token) -> void { } // namespace auto main() -> int { - exec::stop_source source; - ::std::thread act([token = source.get_token()] { active(token); }); - ::std::thread inact([token = source.get_token()] { inactive(token); }); + try { - print("threads started\n"); - source.request_stop(); - print("threads cancelled\n"); + exec::stop_source source; + ::std::thread act([token = source.get_token()] { active(token); }); + ::std::thread inact([token = source.get_token()] { inactive(token); }); - act.join(); - inact.join(); - print("done\n"); + print("threads started\n"); + source.request_stop(); + print("threads cancelled\n"); + + act.join(); + inact.join(); + print("done\n"); + } catch (const std::exception& ex) { + std::cout << "ERROR: " << ex.what() << "\n"; + } } diff --git a/examples/stopping.cpp b/examples/stopping.cpp index d82151f5..4b43c3a2 100644 --- a/examples/stopping.cpp +++ b/examples/stopping.cpp @@ -22,7 +22,7 @@ namespace { struct env { ex::inplace_stop_token token; - env(ex::inplace_stop_token token) : token(token) {} // NOLINT(hicpp-explicit-conversions) + explicit env(ex::inplace_stop_token t) : token(t) {} // NOLINT(hicpp-explicit-conversions) auto query(const ex::get_stop_token_t&) const noexcept { return this->token; } }; @@ -38,7 +38,7 @@ struct inject_cancel_sender { std::remove_cvref_t inner_receiver; ex::inplace_stop_token token{}; - auto get_env() const noexcept -> env { return {this->token}; } + auto get_env() const noexcept -> env { return env(this->token); } template auto set_value(T&&... t) noexcept -> void { diff --git a/examples/when_all-cancel.cpp b/examples/when_all-cancel.cpp index 0fcb1105..f7daca8d 100644 --- a/examples/when_all-cancel.cpp +++ b/examples/when_all-cancel.cpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace ex = beman::execution26; @@ -106,11 +107,15 @@ struct eager { std::optional inner_state; template - state(R&& r, S&& s) : outer_receiver(std::forward(r)), inner_state() { - inner_state.emplace(std::forward(s), receiver{this}); + state(R&& r, S&& s) + : outer_receiver(std::forward(r)), inner_state(std::in_place, std::forward(s), receiver{this}) {} + auto start() & noexcept -> void { + if (this->inner_state) { + ex::start((*this->inner_state).st); + } else { + assert(this->inner_state); + } } - // TODO on next line: bugprone-unchecked-optional-access - auto start() & noexcept -> void { ex::start((*this->inner_state).st); } }; template auto connect(Receiver&& receiver) { diff --git a/include/beman/execution26/conqueue.hpp b/include/beman/execution26/conqueue.hpp new file mode 100644 index 00000000..1bda4cfc --- /dev/null +++ b/include/beman/execution26/conqueue.hpp @@ -0,0 +1,18 @@ +// include/beman/execution26/conqueue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_CONQUEUE +#define INCLUDED_BEMAN_EXECUTION26_CONQUEUE + +// ---------------------------------------------------------------------------- + +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/as_awaitable.hpp b/include/beman/execution26/detail/as_awaitable.hpp index 0366b386..5da05f5e 100644 --- a/include/beman/execution26/detail/as_awaitable.hpp +++ b/include/beman/execution26/detail/as_awaitable.hpp @@ -4,7 +4,6 @@ #ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_AS_AWAITABLE #define INCLUDED_BEMAN_EXECUTION26_DETAIL_AS_AWAITABLE -#include #include #include #include @@ -31,12 +30,11 @@ struct as_awaitable_t { "as_awaitable must return an awaitable"); return ::std::forward(expr).as_awaitable(promise); } else if constexpr (::beman::execution26::detail:: - is_awaitable) { + is_awaitable || + not::beman::execution26::detail::awaitable_sender) { return ::std::forward(expr); - } else if constexpr (::beman::execution26::detail::awaitable_sender) { - return ::beman::execution26::detail::sender_awaitable{::std::forward(expr), promise}; } else { - return ::std::forward(expr); + return ::beman::execution26::detail::sender_awaitable{::std::forward(expr), promise}; } } }; diff --git a/include/beman/execution26/detail/async_concurrent_queue.hpp b/include/beman/execution26/detail/async_concurrent_queue.hpp new file mode 100644 index 00000000..6b1b820c --- /dev/null +++ b/include/beman/execution26/detail/async_concurrent_queue.hpp @@ -0,0 +1,24 @@ +// include/beman/execution26/detail/async_concurrent_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_ASYNC_CONCURRENT_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_ASYNC_CONCURRENT_QUEUE + +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +template +concept async_concurrent_queue = ::beman::execution26::detail::basic_concurrent_queue && requires(Q q, T&& t) { + { q.async_push(::std::forward(t)) } noexcept -> ::beman::execution26::detail::sender_of<>; + { q.async_pop() } noexcept -> ::beman::execution26::detail::sender_of; +}; +} // namespace beman::execution26::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/atomic_intrusive_stack.hpp b/include/beman/execution26/detail/atomic_intrusive_stack.hpp index e579c4dc..445c1fb8 100644 --- a/include/beman/execution26/detail/atomic_intrusive_stack.hpp +++ b/include/beman/execution26/detail/atomic_intrusive_stack.hpp @@ -31,7 +31,7 @@ class atomic_intrusive_stack; //! //! @tparam Item The type of the item in the stack. //! @tparam Next The pointer to the next item in the stack. -template +template class atomic_intrusive_stack { public: atomic_intrusive_stack() = default; @@ -85,4 +85,4 @@ class atomic_intrusive_stack { } // namespace beman::execution26::detail -#endif \ No newline at end of file +#endif diff --git a/include/beman/execution26/detail/awaitable_sender.hpp b/include/beman/execution26/detail/awaitable_sender.hpp index 0eeb9004..3de4e804 100644 --- a/include/beman/execution26/detail/awaitable_sender.hpp +++ b/include/beman/execution26/detail/awaitable_sender.hpp @@ -19,4 +19,4 @@ concept awaitable_sender = }; } // namespace beman::execution26::detail -#endif \ No newline at end of file +#endif diff --git a/include/beman/execution26/detail/basic_concurrent_queue.hpp b/include/beman/execution26/detail/basic_concurrent_queue.hpp new file mode 100644 index 00000000..27a92e6f --- /dev/null +++ b/include/beman/execution26/detail/basic_concurrent_queue.hpp @@ -0,0 +1,30 @@ +// include/beman/execution26/detail/basic_concurrent_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_BASIC_CONCURRENT_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_BASIC_CONCURRENT_QUEUE + +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +template +concept basic_concurrent_queue = + ::std::move_constructible<::std::remove_cvref_t> && ::std::same_as<::std::decay_t, typename Q::value_type> && + requires(Q q, const Q qc, T&& t, ::std::error_code ec) { + { qc.is_closed() } noexcept -> ::std::same_as; + { q.close() } noexcept -> ::std::same_as; + { q.push(std::forward(t)) } -> ::std::same_as; + { q.push(std::forward(t), ec) } noexcept -> ::std::same_as; + { q.pop(ec) } -> ::std::same_as<::std::optional>; + { q.pop() } -> ::std::same_as; + }; +} // namespace beman::execution26::detail +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/basic_operation.hpp b/include/beman/execution26/detail/basic_operation.hpp index 1921195c..00ab3633 100644 --- a/include/beman/execution26/detail/basic_operation.hpp +++ b/include/beman/execution26/detail/basic_operation.hpp @@ -26,8 +26,10 @@ namespace beman::execution26::detail { */ template requires ::beman::execution26::detail:: - valid_specialization<::beman::execution26::detail::state_type, Sender, Receiver> - struct basic_operation : ::beman::execution26::detail::basic_state { + //-dk:TODO why is the remove_cvref_t needed...? + valid_specialization<::beman::execution26::detail::state_type, std::remove_cvref_t, Receiver> +struct basic_operation : ::beman::execution26::detail::basic_state { + // static_assert(std::same_as>); friend struct ::beman::execution26::start_t; using operation_state_concept = ::beman::execution26::operation_state_t; using tag_t = ::beman::execution26::tag_of_t; @@ -35,9 +37,14 @@ template using inner_ops_t = ::beman::execution26::detail::connect_all_result; inner_ops_t inner_ops; - basic_operation(Sender&& sender, Receiver&& receiver) noexcept(true /*-dk:TODO*/) + basic_operation(Sender&& sender, Receiver&& rcvr) noexcept( + noexcept(::beman::execution26::detail::basic_state(::std::forward(sender), + ::std::move(rcvr))) && + noexcept(::beman::execution26::detail::connect_all(this, + ::std::forward(sender), + ::beman::execution26::detail::indices_for()))) : ::beman::execution26::detail::basic_state(::std::forward(sender), - ::std::move(receiver)), + ::std::move(rcvr)), // NOLINTBEGIN(bugprone-use-after-move,hicpp-invalid-access-moved) //-dk:TODO deal with moving the sender twice inner_ops(::beman::execution26::detail::connect_all( diff --git a/include/beman/execution26/detail/basic_sender.hpp b/include/beman/execution26/detail/basic_sender.hpp index 71776f3e..00e4ff33 100644 --- a/include/beman/execution26/detail/basic_sender.hpp +++ b/include/beman/execution26/detail/basic_sender.hpp @@ -33,10 +33,9 @@ struct basic_sender : ::beman::execution26::detail::product_type; auto get_env() const noexcept -> decltype(auto) { - auto data{::beman::execution26::detail::get_sender_data(*this)}; - return ::std::apply( - [&data](auto&&... c) { return ::beman::execution26::detail::impls_for::get_attrs(data.data, c...); }, - data.children); + auto&& d{this->template get<1>()}; + return sub_apply<2>( + [&d](auto&&... c) { return ::beman::execution26::detail::impls_for::get_attrs(d, c...); }, *this); } template @@ -44,30 +43,37 @@ struct basic_sender : ::beman::execution26::detail::product_type - auto connect(Receiver receiver) & noexcept(true /*-dk:TODO*/) + auto connect(Receiver receiver) & noexcept( + noexcept(::beman::execution26::detail::basic_operation{*this, ::std::move(receiver)})) -> ::beman::execution26::detail::basic_operation { return {*this, ::std::move(receiver)}; } template <::beman::execution26::receiver Receiver> - auto connect(Receiver receiver) const& noexcept(true /*-dk:TODO*/) + auto connect(Receiver receiver) const& noexcept(noexcept( + ::beman::execution26::detail::basic_operation{*this, ::std::move(receiver)})) -> ::beman::execution26::detail::basic_operation { return {*this, ::std::move(receiver)}; } template <::beman::execution26::receiver Receiver> - auto connect(Receiver receiver) && noexcept(true /*-dk:TODO*/) + auto connect(Receiver receiver) && noexcept( + noexcept(::beman::execution26::detail::basic_operation{::std::move(*this), + ::std::move(receiver)})) -> ::beman::execution26::detail::basic_operation { return {::std::move(*this), ::std::move(receiver)}; } #else template <::beman::execution26::detail::decays_to Self, ::beman::execution26::receiver Receiver> - auto connect(this Self&& self, Receiver receiver) noexcept(true /*-dk:TODO*/) + auto + connect(this Self&& self, + Receiver receiver) noexcept(noexcept(::beman::execution26::detail::basic_operation{ + ::std::forward(self), ::std::move(receiver)})) -> ::beman::execution26::detail::basic_operation { return {::std::forward(self), ::std::move(receiver)}; } #endif -#if __cpp_explicit_this_parameter < 202110L +#if __cpp_explicit_this_parameter < 302110L template auto get_completion_signatures(Env&&) && -> ::beman::execution26::detail::completion_signatures_for { @@ -75,7 +81,7 @@ struct basic_sender : ::beman::execution26::detail::product_type auto get_completion_signatures( - Env&&) const&& -> ::beman::execution26::detail::completion_signatures_for { + Env&&) const&& -> ::beman::execution26::detail::completion_signatures_for { return {}; } template @@ -85,7 +91,7 @@ struct basic_sender : ::beman::execution26::detail::product_type auto get_completion_signatures( - Env&&) const& -> ::beman::execution26::detail::completion_signatures_for { + Env&&) const& -> ::beman::execution26::detail::completion_signatures_for { return {}; } #else diff --git a/include/beman/execution26/detail/basic_state.hpp b/include/beman/execution26/detail/basic_state.hpp index 292f3090..83850a44 100644 --- a/include/beman/execution26/detail/basic_state.hpp +++ b/include/beman/execution26/detail/basic_state.hpp @@ -20,8 +20,8 @@ namespace beman::execution26::detail { */ template struct basic_state { - basic_state(Sender&& sender, Receiver&& receiver) noexcept(true) - : receiver(::std::move(receiver)), + basic_state(Sender&& sender, Receiver&& rcvr) noexcept(true) + : receiver(::std::move(rcvr)), state(::beman::execution26::detail::impls_for< ::beman::execution26::tag_of_t >::get_state( ::std::forward(sender), this->receiver)) {} diff --git a/include/beman/execution26/detail/bounded_queue.hpp b/include/beman/execution26/detail/bounded_queue.hpp new file mode 100644 index 00000000..00afe7da --- /dev/null +++ b/include/beman/execution26/detail/bounded_queue.hpp @@ -0,0 +1,602 @@ +// include/beman/execution26/detail/bounded_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_BOUNDED_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_BOUNDED_QUEUE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26 { +//! @brief This class templates provides a thread-safe, bounded queue +template > +class bounded_queue; +} // namespace beman::execution26 + +// ---------------------------------------------------------------------------- + +template +class beman::execution26::bounded_queue : ::beman::execution26::detail::immovable { + public: + using value_type = ::std::remove_cvref_t; + using allocator_type = Allocator; + using allocator_traits = ::std::allocator_traits; + class push_sender; + class pop_sender; + + static_assert(::std::same_as); + + explicit bounded_queue(::std::size_t max, Allocator allocator = {}); + bounded_queue(bounded_queue&&) = delete; + bounded_queue(const bounded_queue&) = delete; + ~bounded_queue() { + for (; this->tail != this->head; ++this->tail) { + this->destroy(this->get(this->tail)); + } + array_allocator_traits::deallocate(this->array_allocator, this->elements, this->max); + } + auto operator=(bounded_queue&&) -> bounded_queue& = delete; + auto operator=(const bounded_queue&) -> bounded_queue& = delete; + + auto is_closed() const noexcept -> bool; + auto close() noexcept -> void; + + auto push(const T& value) -> void; + auto push(T&& value) -> void; + auto push(const T& value, ::std::error_code& ec) -> bool; + auto push(T&& value, ::std::error_code& ec) -> bool; + auto try_push(const T& value, ::std::error_code& ec) -> bool; + auto try_push(T&& value, ::std::error_code& ec) -> bool; + auto async_push(const T& value) -> push_sender; + auto async_push(T&& value) -> push_sender; + + auto pop() -> value_type; + auto pop(::std::error_code& ec) -> ::std::optional; + auto try_pop(::std::error_code& ec) -> ::std::optional; + auto async_pop() -> pop_sender; + + private: + struct push_base : ::beman::execution26::detail::virtual_immovable { + value_type value; + push_base* next{}; + + template <::beman::execution26::detail::decayed_same_as Val> + explicit push_base(Val&& val) : value(::std::forward(val)) {} + virtual auto complete() -> void = 0; + virtual auto complete(::beman::execution26::conqueue_errc) -> void = 0; + virtual auto complete(::std::exception_ptr) -> void = 0; + }; + struct pop_base : ::beman::execution26::detail::virtual_immovable { + pop_base* next{}; + + pop_base() = default; + virtual auto complete(value_type) -> void = 0; + virtual auto complete(::beman::execution26::conqueue_errc) -> void = 0; + virtual auto complete(::std::exception_ptr) -> void = 0; + }; + + struct blocking_push_state; + struct blocking_pop_state; + + union element_t { + element_t() = default; + element_t(element_t&&) = delete; + element_t(const element_t&) = delete; + template + explicit element_t(allocator_type& alloc, Args&&... args) { + allocator_traits::construct(alloc, &value, ::std::forward(args)...); + } + ~element_t() {} + auto operator=(element_t&&) -> element_t& = delete; + auto operator=(const element_t&) -> element_t& = delete; + + value_type value; + }; + + using array_allocator_type = allocator_traits::template rebind_alloc; + using array_allocator_traits = allocator_traits::template rebind_traits; + using push_sender_queue = ::beman::execution26::detail::intrusive_queue<&push_base::next>; + using pop_sender_queue = ::beman::execution26::detail::intrusive_queue<&pop_base::next>; + + template + auto construct(element_t* element, Args&&... args) -> void; + auto destroy(element_t* element) -> void; + auto get(::std::uint64_t index) noexcept -> element_t*; + template + auto has_space(Lock&) noexcept -> bool; + template + auto push_value(Lock&, V&&) -> void; + template + auto pop_value(Lock&) -> value_type; + + template + auto internal_push(Arg&& arg, ::beman::execution26::conqueue_errc& error) -> bool; + template + auto internal_try_push(Arg&& arg, ::std::error_code& ec) -> bool; + template + auto internal_pop(HandleResult); + + auto start_push(push_base& s) -> void; + auto start_pop(pop_base& s) -> void; + + auto pop_notify(auto& cerberus) -> void; + auto push_notify(auto& cerberus) -> void; + + allocator_type allocator; + array_allocator_type array_allocator; + mutable ::std::mutex mutex; + push_sender_queue push_queue; + pop_sender_queue pop_queue; + ::std::size_t max; + element_t* elements; + ::std::uint64_t head{}; // the next element to push to + ::std::uint64_t tail{}; // the next element to push from + bool closed{}; +}; + +template +struct beman::execution26::bounded_queue::blocking_push_state : push_base { + bool ready{false}; + ::std::variant<::std::monostate, ::beman::execution26::conqueue_errc, ::std::exception_ptr> result; + ::std::condition_variable condition; + bounded_queue& queue; + + template + explicit blocking_push_state(bounded_queue& que, Arg&& arg) : push_base(::std::forward(arg)), queue(que) {} + auto complete() -> void override { + ::std::lock_guard cerberus(queue.mutex); + { + this->ready = true; + } + this->condition.notify_one(); + } + auto complete(::beman::execution26::conqueue_errc err) -> void override { + result = err; + this->complete(); + } + auto complete(::std::exception_ptr ex) -> void override { + result = std::move(ex); + this->complete(); + } +}; +template +struct beman::execution26::bounded_queue::blocking_pop_state : pop_base { + bounded_queue& queue; + bool ready{false}; + ::std::variant<::std::monostate, value_type, ::beman::execution26::conqueue_errc, ::std::exception_ptr> result{}; + ::std::condition_variable condition{}; + + explicit blocking_pop_state(bounded_queue& que) : queue(que) {} + auto complete(value_type value) -> void override { + { + ::std::lock_guard cerberus(queue.mutex); + this->result = ::std::move(value); + this->ready = true; + } + this->condition.notify_one(); + } + auto complete(::beman::execution26::conqueue_errc err) -> void override { + { + ::std::lock_guard cerberus(queue.mutex); + result = err; + this->ready = true; + } + this->condition.notify_one(); + } + auto complete(::std::exception_ptr ex) -> void override { + { + ::std::lock_guard cerberus(queue.mutex); + result = std::move(ex); + this->ready = true; + } + this->condition.notify_one(); + } +}; + +template +class beman::execution26::bounded_queue::push_sender { + public: + template + struct state : push_base { + using operation_state_concept = ::beman::execution26::operation_state_t; + + bounded_queue& queue; + ::std::remove_cvref_t receiver; + + template + state(bounded_queue& que, T&& value, R&& rcvr) + : push_base(::std::move(value)), queue(que), receiver(::std::forward(rcvr)) {} + + auto start() & noexcept { this->queue.start_push(*this); } + auto complete() -> void override { ::beman::execution26::set_value(::std::move(this->receiver)); } + auto complete(::beman::execution26::conqueue_errc error) -> void override { + ::beman::execution26::set_error(::std::move(this->receiver), error); + } + auto complete(::std::exception_ptr ex) -> void override { + ::beman::execution26::set_error(::std::move(this->receiver), std::move(ex)); + } + }; + + using sender_concept = ::beman::execution26::sender_t; + //-dk:TODO remove exception_ptr if move-ctor can't throw + using completion_signatures = + ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t( + ::beman::execution26::conqueue_errc), + ::beman::execution26::set_error_t(::std::exception_ptr), + ::beman::execution26::set_stopped_t()>; + + bounded_queue& queue; + ::std::remove_cvref_t value; + template + push_sender(bounded_queue& que, Val&& val) : queue(que), value(::std::forward(val)) {} + template <::beman::execution26::receiver Receiver> + auto connect(Receiver&& receiver) && -> state { + return state(queue, ::std::move(value), ::std::forward(receiver)); + } +}; + +template +class beman::execution26::bounded_queue::pop_sender { + public: + template + struct state : pop_base { + using operation_state_concept = ::beman::execution26::operation_state_t; + + bounded_queue& queue; + ::std::remove_cvref_t receiver; + + template + state(bounded_queue& que, R&& rcvr) : pop_base(), queue(que), receiver(::std::forward(rcvr)) {} + + auto start() & noexcept { this->queue.start_pop(*this); } + auto complete(T val) -> void override { + ::beman::execution26::set_value(::std::move(this->receiver), ::std::move(val)); + } + auto complete(::beman::execution26::conqueue_errc error) -> void override { + ::beman::execution26::set_error(::std::move(this->receiver), error); + } + auto complete(::std::exception_ptr ex) -> void override { + ::beman::execution26::set_error(::std::move(this->receiver), ::std::move(ex)); + } + }; + + using sender_concept = ::beman::execution26::sender_t; + using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(T), + ::beman::execution26::set_error_t( + ::beman::execution26::conqueue_errc), + ::beman::execution26::set_stopped_t()>; + + bounded_queue& queue; + template <::beman::execution26::receiver Receiver> + auto connect(Receiver&& receiver) && -> state { + return state(queue, ::std::forward(receiver)); + } +}; + +template +beman::execution26::bounded_queue::bounded_queue(::std::size_t mx, Allocator alloc) + : allocator(alloc), + array_allocator(alloc), + max(mx), + elements(array_allocator_traits::allocate(this->array_allocator, this->max)) {} + +template +auto beman::execution26::bounded_queue::is_closed() const noexcept -> bool { + std::lock_guard cerberos(this->mutex); + return this->closed; +} + +template +auto beman::execution26::bounded_queue::close() noexcept -> void { + std::unique_lock cerberus(this->mutex); + this->closed = true; + push_sender_queue push_que(std::move(this->push_queue)); + pop_sender_queue pop_que(std::move(this->pop_queue)); + assert(cerberus.owns_lock()); + cerberus.unlock(); + + while (not push_que.empty()) { + push_que.pop()->complete(::beman::execution26::conqueue_errc::closed); + } + while (not pop_que.empty()) { + pop_que.pop()->complete(::beman::execution26::conqueue_errc::closed); + } +} + +template +auto beman::execution26::bounded_queue::push(const T& value) -> void { + ::beman::execution26::conqueue_errc error{}; + if (not this->internal_push(value, error)) { + throw ::beman::execution26::conqueue_error(error); + } +} + +template +auto beman::execution26::bounded_queue::push(T&& value) -> void { + ::beman::execution26::conqueue_errc error{}; + if (not this->internal_push(::std::move(value), error)) { + throw ::beman::execution26::conqueue_error(error); + } +} + +template +auto beman::execution26::bounded_queue::push(const T& value, ::std::error_code& ec) -> bool { + ::beman::execution26::conqueue_errc error{}; + if (not this->internal_push(value, error)) { + ec = make_error_code(error); + return false; + } + return true; +} + +template +auto beman::execution26::bounded_queue::push(T&& value, ::std::error_code& ec) -> bool { + ::beman::execution26::conqueue_errc error{}; + if (not this->internal_push(value, error)) { + ec = make_error_code(error); + return false; + } + return true; +} + +template +auto beman::execution26::bounded_queue::try_push(const T& value, ::std::error_code& ec) -> bool { + return this->internal_try_push(value, ec); +} + +template +auto beman::execution26::bounded_queue::try_push(T&& value, ::std::error_code& ec) -> bool { + return this->internal_try_push(::std::move(value), ec); +} + +template +auto beman::execution26::bounded_queue::async_push(const T& value) -> push_sender { + return push_sender(*this, value); +} + +template +auto beman::execution26::bounded_queue::async_push(T&& value) -> push_sender { + return push_sender(*this, ::std::move(value)); +} + +template +auto beman::execution26::bounded_queue::pop() -> value_type { + return this->internal_pop([](auto&& variant) { + switch (variant.index()) { + default: + throw ::std::runtime_error("unknown result in bounded_queue"); + case 1: + return ::std::move(::std::get<1>(variant)); + case 2: + throw ::beman::execution26::conqueue_error(::std::get<2>(variant)); + case 3: + ::std::rethrow_exception(::std::get<3>(variant)); + } + }); +} + +template +auto beman::execution26::bounded_queue::pop(::std::error_code& ec) -> ::std::optional { + return this->internal_pop([&ec](auto&& variant) { + switch (variant.index()) { + default: + throw ::std::runtime_error("unknown result in bounded_queue"); + case 1: + return ::std::optional(::std::move(::std::get<1>(variant))); + case 2: + ec = make_error_code(::std::get<2>(variant)); + return ::std::optional(); + case 3: + ::std::rethrow_exception(::std::get<3>(variant)); + } + }); +} + +template +auto beman::execution26::bounded_queue::try_pop(::std::error_code& ec) -> ::std::optional { + ::std::unique_lock cerberus(this->mutex); + if (this->closed) { + ec = make_error_code(::beman::execution26::conqueue_errc::closed); + return {}; + } + if (this->head == this->tail) { + ec = make_error_code(::beman::execution26::conqueue_errc::empty); + return {}; + } + return this->pop_value(cerberus); +} + +template +auto beman::execution26::bounded_queue::async_pop() -> pop_sender { + return pop_sender(*this); +} + +template +template +auto beman::execution26::bounded_queue::construct(element_t* element, Args&&... args) -> void { + array_allocator_traits::construct(this->array_allocator, element, this->allocator, ::std::forward(args)...); +} + +template +auto beman::execution26::bounded_queue::destroy(element_t* element) -> void { + allocator_traits::destroy(this->allocator, &element->value); + array_allocator_traits::destroy(this->array_allocator, element); +} + +template +auto beman::execution26::bounded_queue::get(::std::uint64_t index) noexcept -> element_t* { + return this->elements + (index % this->max); +} + +template +template +auto beman::execution26::bounded_queue::internal_pop(HandleResult handle_result) { + blocking_pop_state s(*this); + this->start_pop(s); + { + ::std::unique_lock cerberus(this->mutex); + s.condition.wait(cerberus, [&ready = s.ready] { return ready; }); + } + return handle_result(::std::move(s.result)); +} + +template +template +auto beman::execution26::bounded_queue::internal_push(Arg&& arg, + ::beman::execution26::conqueue_errc& error) + -> bool { + blocking_push_state s(*this, ::std::forward(arg)); + this->start_push(s); + { + ::std::unique_lock cerberus(this->mutex); + s.condition.wait(cerberus, [&ready = s.ready] { return ready; }); + } + switch (s.result.index()) { + case 0: + return true; + case 1: + error = ::std::get<1>(s.result); + break; + case 2: + std::rethrow_exception(::std::get<2>(s.result)); + } + return false; +} + +template +template +auto beman::execution26::bounded_queue::internal_try_push(Arg&& arg, ::std::error_code& ec) -> bool { + ::std::unique_lock cerberus(this->mutex); + if (this->closed) { + ec = make_error_code(::beman::execution26::conqueue_errc::closed); + return false; + } + if (not this->has_space(cerberus)) { + ec = make_error_code(::beman::execution26::conqueue_errc::full); + return false; + } + this->push_value(cerberus, ::std::forward(arg)); + return true; +} + +// NOLINTBEGIN(misc-no-recursion) +template +auto beman::execution26::bounded_queue::pop_notify(auto& cerberus) -> void { + assert(cerberus.owns_lock()); + if (not this->pop_queue.empty()) { + auto state{this->pop_queue.pop()}; + state->complete(this->pop_value(cerberus)); + } else { + cerberus.unlock(); + } +} +// NOLINTEND(misc-no-recursion) + +// NOLINTBEGIN(misc-no-recursion) +template +auto beman::execution26::bounded_queue::push_notify(auto& cerberus) -> void { + assert(cerberus.owns_lock()); + if (not this->push_queue.empty()) { + push_base* push(this->push_queue.pop()); + try { + this->push_value(cerberus, ::std::move(push->value)); + assert(not cerberus.owns_lock()); + } catch (...) { + cerberus.unlock(); + push->complete(::std::current_exception()); + assert(not cerberus.owns_lock()); + return; + } + push->complete(); + } else { + cerberus.unlock(); + } + assert(not cerberus.owns_lock()); +} +// NOLINTEND(misc-no-recursion) + +template +template +auto beman::execution26::bounded_queue::has_space(Lock&) noexcept -> bool { + return this->head - this->tail < this->max; +} + +// NOLINTBEGIN(misc-no-recursion) +template +template +auto beman::execution26::bounded_queue::push_value(Lock& cerberus, V&& value) -> void { + + this->construct(this->get(this->head), ::std::forward(value)); + ++this->head; + assert(cerberus.owns_lock()); + this->pop_notify(cerberus); + assert(not cerberus.owns_lock()); +} +// NOLINTEND(misc-no-recursion) + +// NOLINTBEGIN(misc-no-recursion) +template +template +auto beman::execution26::bounded_queue::pop_value(Lock& cerberus) -> value_type { + element_t* element(this->get(tail)); + value_type val(std::move(element->value)); + ++this->tail; + this->destroy(element); + this->push_notify(cerberus); + return val; +} +// NOLINTEND(misc-no-recursion) + +template +auto beman::execution26::bounded_queue::start_push(push_base& s) -> void { + std::unique_lock cerberus(this->mutex); + if (this->closed) { + cerberus.unlock(); + s.complete(::beman::execution26::conqueue_errc::closed); + } else if (this->has_space(cerberus)) { + try { + this->push_value(cerberus, std::move(s.value)); + s.complete(); + } catch (...) { + assert(cerberus.owns_lock()); + cerberus.unlock(); + s.complete(std::current_exception()); + } + } else { + this->push_queue.push(&s); + } +} + +template +auto beman::execution26::bounded_queue::start_pop(pop_base& s) -> void { + std::unique_lock cerberus(this->mutex); + if (this->closed) { + cerberus.unlock(); + s.complete(::beman::execution26::conqueue_errc::closed); + } else if (this->head != this->tail) { + s.complete(this->pop_value(cerberus)); + } else { + this->pop_queue.push(&s); + } +} + +// ---------------------------------------------------------------------------- + +#endif \ No newline at end of file diff --git a/include/beman/execution26/detail/common.hpp b/include/beman/execution26/detail/common.hpp index dfe65886..b2463b20 100644 --- a/include/beman/execution26/detail/common.hpp +++ b/include/beman/execution26/detail/common.hpp @@ -21,7 +21,7 @@ * * There are a few ingredients to using `std::execution`: * - * - Sender algoritms to composes work into an asynchronous workflow. + * - Sender algorithms to composes work into an asynchronous workflow. * - Something holding and starting senders like `sync_wait()` * or `counting_scope`. * - A coroutine binding like `task` to make sender composition diff --git a/include/beman/execution26/detail/completion_signatures_for.hpp b/include/beman/execution26/detail/completion_signatures_for.hpp index aea24d53..c191de2c 100644 --- a/include/beman/execution26/detail/completion_signatures_for.hpp +++ b/include/beman/execution26/detail/completion_signatures_for.hpp @@ -21,7 +21,7 @@ namespace beman::execution26::detail { struct no_completion_signatures_defined_in_sender {}; /*! - * \brief Primary template declaration for the customisation of sender completion signatures. + * \brief Primary template declaration for the customization of sender completion signatures. * \headerfile beman/execution26/execution.hpp * \internal */ diff --git a/include/beman/execution26/detail/completion_signatures_of_t.hpp b/include/beman/execution26/detail/completion_signatures_of_t.hpp index a7d74fe5..d6d08770 100644 --- a/include/beman/execution26/detail/completion_signatures_of_t.hpp +++ b/include/beman/execution26/detail/completion_signatures_of_t.hpp @@ -20,7 +20,7 @@ template requires ::beman::execution26::sender_in using completion_signatures_of_t = ::beman::execution26::detail::call_result_t< ::beman::execution26::get_completion_signatures_t, Sender, Env>; -} +} // namespace beman::execution26 // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/completion_tag.hpp b/include/beman/execution26/detail/completion_tag.hpp index 8dca471a..f1e64174 100644 --- a/include/beman/execution26/detail/completion_tag.hpp +++ b/include/beman/execution26/detail/completion_tag.hpp @@ -21,7 +21,7 @@ template concept completion_tag = ::std::same_as || ::std::same_as || ::std::same_as; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/concurrent_queue.hpp b/include/beman/execution26/detail/concurrent_queue.hpp new file mode 100644 index 00000000..8944bb8c --- /dev/null +++ b/include/beman/execution26/detail/concurrent_queue.hpp @@ -0,0 +1,26 @@ +// include/beman/execution26/detail/concurrent_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_CONCURRENT_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_CONCURRENT_QUEUE + +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +template +concept concurrent_queue = + ::beman::execution26::detail::basic_concurrent_queue && requires(Q q, T&& t, ::std::error_code ec) { + { q.try_push(::std::forward(t), ec) } -> ::std::same_as; + { q.try_pop(ec) } -> ::std::same_as<::std::optional>; + }; +} // namespace beman::execution26::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/connect.hpp b/include/beman/execution26/detail/connect.hpp index a7b33300..f47b3631 100644 --- a/include/beman/execution26/detail/connect.hpp +++ b/include/beman/execution26/detail/connect.hpp @@ -21,14 +21,41 @@ namespace beman::execution26::detail { * \internal */ struct connect_t { + private: template - auto operator()(Sender&& sender, Receiver&& receiver) const noexcept(true /*-dk:TODO*/) { + static auto make_new_sender(Sender&& sender, Receiver&& receiver) + //-dk:TODO this noexcept needs to get confirmed/fixed + noexcept(true) -> decltype(auto) { + return ::beman::execution26::transform_sender( + decltype(::beman::execution26::detail::get_domain_late(::std::forward(sender), + ::beman::execution26::get_env(receiver))){}, + ::std::forward(sender), + ::beman::execution26::get_env(receiver)); + } + template + static constexpr auto connect_noexcept() -> bool { + if constexpr (requires { + make_new_sender(::std::declval(), ::std::declval()) + .connect(::std::declval()); + }) { + return noexcept(make_new_sender(::std::declval(), ::std::declval()) + .connect(::std::declval())); + } else if constexpr (requires { + ::beman::execution26::detail::connect_awaitable( + make_new_sender(::std::declval(), ::std::declval()), + ::std::declval()); + }) { + return noexcept(::beman::execution26::detail::connect_awaitable( + make_new_sender(::std::declval(), ::std::declval()), ::std::declval())); + } + return true; + } + + public: + template + auto operator()(Sender&& sender, Receiver&& receiver) const noexcept(connect_noexcept()) { auto new_sender = [&sender, &receiver]() -> decltype(auto) { - return ::beman::execution26::transform_sender( - decltype(::beman::execution26::detail::get_domain_late(::std::forward(sender), - ::beman::execution26::get_env(receiver))){}, - ::std::forward(sender), - ::beman::execution26::get_env(receiver)); + return make_new_sender(::std::forward(sender), ::std::forward(receiver)); }; if constexpr (requires { new_sender().connect(::std::forward(receiver)); }) { diff --git a/include/beman/execution26/detail/connect_all.hpp b/include/beman/execution26/detail/connect_all.hpp index 5598ab72..b7a1d3d3 100644 --- a/include/beman/execution26/detail/connect_all.hpp +++ b/include/beman/execution26/detail/connect_all.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -25,24 +26,94 @@ namespace beman::execution26::detail { * \internal */ struct connect_all_t { + private: + template + static auto apply_with_index_helper(::std::index_sequence seq, Fun&& fun, Tuple&& tuple) noexcept(noexcept( + ::std::forward(fun)(seq, ::beman::execution26::detail::forward_like(::std::get(tuple))...))) + -> decltype(auto) { + return ::std::forward(fun)(seq, + ::beman::execution26::detail::forward_like(::std::get(tuple))...); + } + template + static auto apply_with_index(Fun&& fun, Tuple&& tuple) noexcept( + noexcept(apply_with_index_helper(::std::make_index_sequence<::std::tuple_size_v<::std::decay_t>>{}, + ::std::forward(fun), + ::std::forward(tuple)))) -> decltype(auto) { + return apply_with_index_helper(::std::make_index_sequence<::std::tuple_size_v<::std::decay_t>>{}, + ::std::forward(fun), + ::std::forward(tuple)); + } + + template <::std::size_t Start, typename Fun, typename Tuple, ::std::size_t... I> + static auto sub_apply_with_index_helper(::std::index_sequence seq, Fun&& fun, Tuple&& tuple) noexcept( + noexcept(::std::forward(fun)( + seq, ::beman::execution26::detail::forward_like(tuple.template get())...))) + -> decltype(auto) { + return ::std::forward(fun)( + seq, ::beman::execution26::detail::forward_like(tuple.template get())...); + } + template <::std::size_t Start, typename Fun, typename Tuple> + requires requires { ::std::declval().size(); } + static auto sub_apply_with_index(Fun&& fun, Tuple&& tuple) noexcept(noexcept(sub_apply_with_index_helper( + ::std::make_index_sequence<::std::tuple_size_v<::std::decay_t> - Start>{}, + ::std::forward(fun), + ::std::forward(tuple)))) -> decltype(auto) { + return sub_apply_with_index_helper( + ::std::make_index_sequence<::std::tuple_size_v<::std::decay_t> - Start>{}, + ::std::forward(fun), + ::std::forward(tuple)); + } + template <::std::size_t Start, typename Fun, typename Tuple> + requires(not requires { ::std::declval().size(); }) + static auto + sub_apply_with_index(Fun&& fun, + Tuple&&) noexcept(noexcept(::std::forward(fun)(::std::make_index_sequence<0u>{}))) { + return ::std::forward(fun)(::std::make_index_sequence<0u>{}); + } + + template + struct connect_helper { + ::beman::execution26::detail::basic_state* op; + + template <::std::size_t... J, typename... C> + auto operator()(::std::index_sequence, C&&... c) noexcept( + (noexcept(::beman::execution26::connect( + ::beman::execution26::detail::forward_like(c), + ::beman::execution26::detail::basic_receiver>{ + this->op})) && + ... && true)) -> decltype(auto) { + return ::beman::execution26::detail::product_type{::beman::execution26::connect( + ::beman::execution26::detail::forward_like(c), + ::beman::execution26::detail::basic_receiver>{ + this->op})...}; + } + }; + static auto use(auto&&...) {} + + public: //-dk:TODO is the S parameter deviating from the spec? + template + requires requires(Sender&& s) { + s.size(); + s.template get<0>(); + } + auto operator()(::beman::execution26::detail::basic_state* op, + S&& sender, + ::std::index_sequence) const + noexcept(noexcept(sub_apply_with_index<2>(connect_helper{op}, ::std::forward(sender)))) + -> decltype(auto) { + return sub_apply_with_index<2>(connect_helper{op}, ::std::forward(sender)); + } template auto operator()(::beman::execution26::detail::basic_state* op, S&& sender, - ::std::index_sequence) const noexcept(true /*-dk:TODO*/) { - auto data{::beman::execution26::detail::get_sender_data(::std::forward(sender))}; - return ::std::apply( - [&op](auto&&... c) { - return [&op]<::std::size_t... J>(::std::index_sequence, auto&&... c) { - use(op); - return ::beman::execution26::detail::product_type{::beman::execution26::connect( - ::beman::execution26::detail::forward_like(c), - ::beman::execution26::detail:: - basic_receiver>{op})...}; - }(::std::make_index_sequence<::std::tuple_size_v<::std::decay_t>>{}, c...); - }, - data.children); + ::std::index_sequence) const + noexcept(noexcept(apply_with_index( + connect_helper{op}, + ::beman::execution26::detail::get_sender_data(::std::forward(sender)).children))) -> decltype(auto) { + return apply_with_index(connect_helper{op}, + ::beman::execution26::detail::get_sender_data(::std::forward(sender)).children); } }; diff --git a/include/beman/execution26/detail/connect_all_result.hpp b/include/beman/execution26/detail/connect_all_result.hpp index b9c3b110..109bf06e 100644 --- a/include/beman/execution26/detail/connect_all_result.hpp +++ b/include/beman/execution26/detail/connect_all_result.hpp @@ -23,7 +23,7 @@ using connect_all_result = ::beman::execution26::detail::basic_state*, Sender, ::beman::execution26::detail::indices_for >; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/conqueue_errc.hpp b/include/beman/execution26/detail/conqueue_errc.hpp new file mode 100644 index 00000000..c9b4ab8d --- /dev/null +++ b/include/beman/execution26/detail/conqueue_errc.hpp @@ -0,0 +1,61 @@ +// include/beman/execution26/detail/conqueue_errc.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_CONQUEUE_ERRC +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_CONQUEUE_ERRC + +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26 { +enum class conqueue_errc : ::std::uint8_t { empty, full, closed, busy }; + +inline auto conqueue_category() noexcept -> const ::std::error_category&; +inline auto make_error_code(::beman::execution26::conqueue_errc) noexcept -> ::std::error_code; +inline auto make_error_condition(::beman::execution26::conqueue_errc) noexcept -> ::std::error_condition; +} // namespace beman::execution26 + +namespace std { +template <> +struct is_error_code_enum<::beman::execution26::conqueue_errc> : ::std::true_type {}; +} // namespace std + +// ---------------------------------------------------------------------------- + +inline auto beman::execution26::conqueue_category() noexcept -> const ::std::error_category& { + struct category : ::std::error_category { + auto name() const noexcept -> const char* override { return "conqueue"; } + auto message(int value) const -> ::std::string override { + switch (value) { + default: + return "unknown"; + case static_cast(::beman::execution26::conqueue_errc::empty): + return "empty"; + case static_cast(::beman::execution26::conqueue_errc::full): + return "full"; + case static_cast(::beman::execution26::conqueue_errc::closed): + return "closed"; + case static_cast(::beman::execution26::conqueue_errc::busy): + return "busy"; + } + } + }; + static category rc{}; + return rc; +} + +inline auto beman::execution26::make_error_code(conqueue_errc e) noexcept -> ::std::error_code { + return ::std::error_code(static_cast(e), ::beman::execution26::conqueue_category()); +} + +inline auto beman::execution26::make_error_condition(conqueue_errc e) noexcept -> ::std::error_condition { + return ::std::error_condition(static_cast(e), ::beman::execution26::conqueue_category()); +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/conqueue_error.hpp b/include/beman/execution26/detail/conqueue_error.hpp new file mode 100644 index 00000000..86b3e532 --- /dev/null +++ b/include/beman/execution26/detail/conqueue_error.hpp @@ -0,0 +1,23 @@ +// include/beman/execution26/detail/conqueue_error.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_CONQUEUE_ERROR +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_CONQUEUE_ERROR + +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26 { +class conqueue_error : public ::std::system_error { + public: + explicit conqueue_error(::beman::execution26::conqueue_errc ec) + : std::system_error(::beman::execution26::make_error_code(ec), + ::beman::execution26::conqueue_category().message(static_cast(ec))) {} +}; +} // namespace beman::execution26 + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/continues_on.hpp b/include/beman/execution26/detail/continues_on.hpp index c0ca832a..49bd8c56 100644 --- a/include/beman/execution26/detail/continues_on.hpp +++ b/include/beman/execution26/detail/continues_on.hpp @@ -50,7 +50,9 @@ struct continues_on_t { auto operator()(Sender&& sender, Scheduler&& scheduler) const { auto domain(::beman::execution26::detail::get_domain_early(sender)); return ::beman::execution26::transform_sender( - domain, ::beman::execution26::detail::make_sender(*this, scheduler, ::std::forward(sender))); + domain, + ::beman::execution26::detail::make_sender( + *this, std::forward(scheduler), ::std::forward(sender))); } }; diff --git a/include/beman/execution26/detail/decayed_tuple.hpp b/include/beman/execution26/detail/decayed_tuple.hpp index 5c11d271..bd071fbd 100644 --- a/include/beman/execution26/detail/decayed_tuple.hpp +++ b/include/beman/execution26/detail/decayed_tuple.hpp @@ -17,7 +17,7 @@ namespace beman::execution26::detail { */ template using decayed_tuple = ::std::tuple<::std::decay_t...>; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/decayed_typeof.hpp b/include/beman/execution26/detail/decayed_typeof.hpp index fad49e25..602a6132 100644 --- a/include/beman/execution26/detail/decayed_typeof.hpp +++ b/include/beman/execution26/detail/decayed_typeof.hpp @@ -16,7 +16,7 @@ namespace beman::execution26::detail { */ template using decayed_typeof = ::std::decay_t; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/decays_to.hpp b/include/beman/execution26/detail/decays_to.hpp index 9cf1d4a6..98deb351 100644 --- a/include/beman/execution26/detail/decays_to.hpp +++ b/include/beman/execution26/detail/decays_to.hpp @@ -17,7 +17,7 @@ namespace beman::execution26::detail { */ template concept decays_to = ::std::same_as<::std::decay_t, To>; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/default_impls.hpp b/include/beman/execution26/detail/default_impls.hpp index d4f44bb2..fd84eaf9 100644 --- a/include/beman/execution26/detail/default_impls.hpp +++ b/include/beman/execution26/detail/default_impls.hpp @@ -37,10 +37,18 @@ struct default_impls { }; static constexpr auto get_state = [](Sender&& sender, Receiver& receiver) noexcept -> decltype(auto) { - auto&& decompose = ::beman::execution26::detail::get_sender_data(::std::forward(sender)); + auto&& data{[&sender]() -> decltype(auto) { + if constexpr (requires { + sender.size(); + sender.template get<1>(); + }) + return sender.template get<1>(); + else + return ::beman::execution26::detail::get_sender_data(::std::forward(sender)).data; + }()}; return ::beman::execution26::detail::allocator_aware_move( - ::beman::execution26::detail::forward_like(decompose.data), receiver); + ::beman::execution26::detail::forward_like(data), receiver); }; static constexpr auto start = [](auto&, auto&, auto&... ops) noexcept -> void { (::beman::execution26::start(ops), ...); diff --git a/include/beman/execution26/detail/env_of_t.hpp b/include/beman/execution26/detail/env_of_t.hpp index 4e002d6f..11390919 100644 --- a/include/beman/execution26/detail/env_of_t.hpp +++ b/include/beman/execution26/detail/env_of_t.hpp @@ -16,7 +16,7 @@ namespace beman::execution26 { */ template using env_of_t = decltype(::beman::execution26::get_env(::std::declval())); -} +} // namespace beman::execution26 // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/env_type.hpp b/include/beman/execution26/detail/env_type.hpp index 7d5b4e19..f0da379f 100644 --- a/include/beman/execution26/detail/env_type.hpp +++ b/include/beman/execution26/detail/env_type.hpp @@ -24,7 +24,7 @@ using env_type = ::beman::execution26::detail::call_result_t< Index, ::beman::execution26::detail::state_type&, const Receiver&>; -} +} // namespace beman::execution26::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/error_types_of_t.hpp b/include/beman/execution26/detail/error_types_of_t.hpp index 5503d2e8..b9d47f71 100644 --- a/include/beman/execution26/detail/error_types_of_t.hpp +++ b/include/beman/execution26/detail/error_types_of_t.hpp @@ -28,7 +28,7 @@ using error_types_of_t = ::beman::execution26::completion_signatures_of_t, ::std::type_identity_t, Variant>; -} +} // namespace beman::execution26 // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/forwarding_query.hpp b/include/beman/execution26/detail/forwarding_query.hpp index 16e67464..6f460560 100644 --- a/include/beman/execution26/detail/forwarding_query.hpp +++ b/include/beman/execution26/detail/forwarding_query.hpp @@ -39,7 +39,7 @@ namespace beman::execution26 { */ using forwarding_query_t = beman::execution26::detail::forwarding_query_t; /*! - * \brief The customizatoin point object to determine whether querys should be forwarded + * \brief The customizatoin point object to determine whether queries should be forwarded * \headerfile beman/execution26/execution.hpp * * \details diff --git a/include/beman/execution26/detail/fwd_env.hpp b/include/beman/execution26/detail/fwd_env.hpp index 0cf15759..b3288deb 100644 --- a/include/beman/execution26/detail/fwd_env.hpp +++ b/include/beman/execution26/detail/fwd_env.hpp @@ -25,7 +25,7 @@ class fwd_env { Env env; public: - explicit fwd_env(Env&& env) : env(::std::forward(env)) {} + explicit fwd_env(Env&& e) : env(::std::forward(e)) {} template requires(not::beman::execution26::forwarding_query(::std::remove_cvref_t())) diff --git a/include/beman/execution26/detail/gather_signatures.hpp b/include/beman/execution26/detail/gather_signatures.hpp index 9630e6ce..67545797 100644 --- a/include/beman/execution26/detail/gather_signatures.hpp +++ b/include/beman/execution26/detail/gather_signatures.hpp @@ -23,6 +23,7 @@ struct same_tag { }; template struct bound_tag { + using type = Tag; template using predicate = ::beman::execution26::detail::same_tag; }; @@ -35,7 +36,7 @@ template class Transform> ::beman::execution26::detail::always_true>::template meta_apply; } struct gather_signatures_apply { - using type = ::beman::execution26::detail::indirect_meta_apply< + using type = typename ::beman::execution26::detail::indirect_meta_apply< ::beman::execution26::detail::always_true>::template meta_apply; }; @@ -56,7 +57,7 @@ template class Tuple, template < typename ::beman::execution26::detail::gather_signatures_apply::type...>; } struct gather_signatures_helper<::beman::execution26::completion_signatures, Tuple, Variant> { - using type = ::beman::execution26::detail::indirect_meta_apply< + using type = typename ::beman::execution26::detail::indirect_meta_apply< always_true::type...>>:: template meta_apply< Variant, @@ -69,14 +70,12 @@ template class Variant> requires requires { typename ::beman::execution26::detail::gather_signatures_helper< - ::beman::execution26::detail::meta:: - filter<::beman::execution26::detail::bound_tag::template predicate, signatures>, + ::beman::execution26::detail::meta::filter_tag<::beman::execution26::detail::same_tag, Tag, signatures>, Tuple, Variant>::type; } -using gather_signatures = ::beman::execution26::detail::gather_signatures_helper< - ::beman::execution26::detail::meta::filter<::beman::execution26::detail::bound_tag::template predicate, - signatures>, +using gather_signatures = typename ::beman::execution26::detail::gather_signatures_helper< + ::beman::execution26::detail::meta::filter_tag<::beman::execution26::detail::same_tag, Tag, signatures>, Tuple, Variant>::type; } // namespace beman::execution26::detail diff --git a/include/beman/execution26/detail/get_completion_signatures.hpp b/include/beman/execution26/detail/get_completion_signatures.hpp index c77d1707..4ab5aa44 100644 --- a/include/beman/execution26/detail/get_completion_signatures.hpp +++ b/include/beman/execution26/detail/get_completion_signatures.hpp @@ -23,10 +23,10 @@ struct get_completion_signatures_t { private: template static auto get(Sender&& sender, Env&& env) noexcept { - auto new_sender{[](auto&& sender, auto&& env) -> decltype(auto) { - return ::beman::execution26::transform_sender(::beman::execution26::detail::get_domain_late(sender, env), - ::std::forward(sender), - ::std::forward(env)); + auto new_sender{[](auto&& sndr, auto&& e) -> decltype(auto) { + auto domain{::beman::execution26::detail::get_domain_late(sndr, e)}; + return ::beman::execution26::transform_sender( + domain, ::std::forward(sndr), ::std::forward(e)); }}; using sender_type = ::std::remove_cvref_t; @@ -37,20 +37,27 @@ struct get_completion_signatures_t { return typename sender_type::completion_signatures{}; else if constexpr (::beman::execution26::detail:: is_awaitable>) { - return ::beman::execution26::completion_signatures< - ::beman::execution26::set_value_t( - ::beman::execution26::detail:: - await_result_type>), - ::beman::execution26::set_error_t(::std::exception_ptr), - ::beman::execution26::set_stopped_t()>{}; + using result_type = ::beman::execution26::detail:: + await_result_type>; + if constexpr (::std::same_as) { + return ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t( + ::std::exception_ptr), + ::beman::execution26::set_stopped_t()>{}; + } else { + return ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(result_type), + ::beman::execution26::set_error_t( + ::std::exception_ptr), + ::beman::execution26::set_stopped_t()>{}; + } } } public: template - requires(not ::std::same_as(), - ::std::declval()))>) + requires(not::std::same_as(), + ::std::declval()))>) auto operator()(Sender&& sender, Env&& env) const noexcept { return this->get(::std::forward(sender), ::std::forward(env)); } @@ -60,4 +67,4 @@ inline constexpr get_completion_signatures_t get_completion_signatures{}; // ---------------------------------------------------------------------------- -#endif \ No newline at end of file +#endif diff --git a/include/beman/execution26/detail/indices_for.hpp b/include/beman/execution26/detail/indices_for.hpp index cd3b22ac..54b7b175 100644 --- a/include/beman/execution26/detail/indices_for.hpp +++ b/include/beman/execution26/detail/indices_for.hpp @@ -10,7 +10,7 @@ namespace beman::execution26::detail { template -using indices_for = ::std::remove_reference_t::indices_for; +using indices_for = typename ::std::remove_reference_t::indices_for; } // ---------------------------------------------------------------------------- diff --git a/include/beman/execution26/detail/inplace_stop_source.hpp b/include/beman/execution26/detail/inplace_stop_source.hpp index 609da36b..615cf155 100644 --- a/include/beman/execution26/detail/inplace_stop_source.hpp +++ b/include/beman/execution26/detail/inplace_stop_source.hpp @@ -41,7 +41,7 @@ class beman::execution26::inplace_stop_token { friend class ::beman::execution26::inplace_stop_source; template friend class ::beman::execution26::inplace_stop_callback; - explicit inplace_stop_token(::beman::execution26::inplace_stop_source* source) : source(source) {} + explicit inplace_stop_token(::beman::execution26::inplace_stop_source* src) : source(src) {} ::beman::execution26::inplace_stop_source* source{}; }; @@ -85,8 +85,8 @@ class beman::execution26::inplace_stop_callback final template inplace_stop_callback(::beman::execution26::inplace_stop_token, Init&&); inplace_stop_callback(const inplace_stop_callback&) = delete; - inplace_stop_callback(inplace_stop_callback&&) = delete; - ~inplace_stop_callback() { + inplace_stop_callback(inplace_stop_callback&&) = delete; + ~inplace_stop_callback() override { if (this->source) { this->source->deregister(this); } diff --git a/include/beman/execution26/detail/intrusive_queue.hpp b/include/beman/execution26/detail/intrusive_queue.hpp new file mode 100644 index 00000000..8c02fac2 --- /dev/null +++ b/include/beman/execution26/detail/intrusive_queue.hpp @@ -0,0 +1,92 @@ +// include/beman/execution26/detail/intrusive_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE + +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +template +struct intrusive_queue; + +//! @brief This class template implements a non-thread-safe, intrusive queue. +//! @headerfile beman/execution26/detail/intrusive_queue.hpp +//! @details +//! The template argument is a pointer-to-member for some class which +//! is itself a pointer to the type. The member is used to form a singly +//! linked list. +//! +//!

Usage

+//!
+//! intrusive_queue<&Type::next>
+//! 
+//! +//!

Example

+//!
+//! #include 
+//! #include 
+//! namespace detail = beman::execution26::detail;
+//! struct node {
+//!     int   value{};
+//!     node* link{};
+//! };
+//! int main() {
+//!     detail::intrusive<&node::link> q;
+//!     assert(q.empty());
+//!     node ns[]{ { 1 }, { 2 }, { 3 } };
+//!     for (auto& n: ns) { q.push(&n); }
+//!     assert(!q.empty());
+//!     assert(q.pop()->value == 1);
+//!     assert(q.pop()->value == 2);
+//!     assert(q.pop()->value == 3);
+//! }
+//! 
+template +struct intrusive_queue { + T* head{}; + T* tail{}; + + //! @brief The default constructor create an empty queue + constexpr intrusive_queue() noexcept = default; + //! @brief The move constructor transfers the content of the parameter to the new queue + //! @details + //! The original queue `other` will be empty after this operation. + intrusive_queue(intrusive_queue&& other) noexcept + : head(::std::exchange(other.head, nullptr)), tail(::std::exchange(other.tail, nullptr)) {} + intrusive_queue(const intrusive_queue&) = delete; + auto operator=(intrusive_queue&& other) noexcept -> intrusive_queue& { + this->head(::std::exchange(other.head, nullptr)); + this->tail(::std::exchange(other.tail, nullptr)); + return *this; + } + auto operator=(const intrusive_queue&) -> intrusive_queue& = delete; + ~intrusive_queue() = default; + //! @brief Push the node `n` to the end of the queue + //! @details + //! The "next" pointer of the new node is set to `nullptr`. + auto push(T* n) noexcept -> void { + n->*next = nullptr; + if (this->head) { + std::exchange(this->tail, n)->*next = n; + } else { + this->head = this->tail = n; + } + } + //! @brief Remove the start node of the queue and return it + //! @pre The queue is not empty + auto pop() noexcept -> T* { + assert(not this->empty()); + return std::exchange(this->head, this->head->*next); + } + //! @brief Test whether the queue is empty + auto empty() const noexcept -> bool { return this->head == nullptr; } +}; +} // namespace beman::execution26::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/intrusive_stack.hpp b/include/beman/execution26/detail/intrusive_stack.hpp index 4ee89ffb..71e3b3af 100644 --- a/include/beman/execution26/detail/intrusive_stack.hpp +++ b/include/beman/execution26/detail/intrusive_stack.hpp @@ -1,4 +1,4 @@ -// include/beman/execution26/detail/intrusive_queue.hpp -*-C++-*- +// include/beman/execution26/detail/intrusive_stack.hpp -*-C++-*- // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception #ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE @@ -15,16 +15,16 @@ class atomic_intrusive_stack; template class intrusive_stack; -//! @brief This data structure is an intrusive queue that is not thread-safe. -template +//! @brief This data structure is an intrusive stack that is not thread-safe. +template class intrusive_stack { public: - //! @brief Pushes an item to the queue. + //! @brief Pushes an item to the stack. auto push(Item* item) noexcept -> void { item->*Next = std::exchange(head_, item); } - //! @brief Pops one item from the queue. + //! @brief Pops one item from the stack. //! - //! @return The item that was popped from the queue, or nullptr if the queue is empty. + //! @return The item that was popped from the stack, or nullptr if the stack is empty. auto pop() noexcept -> Item* { if (head_) { auto item = head_; @@ -34,7 +34,7 @@ class intrusive_stack { return nullptr; } - //! @brief Tests if the queue is empty. + //! @brief Tests if the stack is empty. auto empty() const noexcept -> bool { return !head_; } private: @@ -44,4 +44,4 @@ class intrusive_stack { } // namespace beman::execution26::detail -#endif \ No newline at end of file +#endif diff --git a/include/beman/execution26/detail/join_env.hpp b/include/beman/execution26/detail/join_env.hpp index f61852e6..1cc32dcc 100644 --- a/include/beman/execution26/detail/join_env.hpp +++ b/include/beman/execution26/detail/join_env.hpp @@ -18,16 +18,14 @@ class join_env { public: template - join_env(E1&& env1, E2&& env2) : env1(::std::forward(env1)), env2(::std::forward(env2)) {} + join_env(E1&& e1, E2&& e2) : env1(::std::forward(e1)), env2(::std::forward(e2)) {} template requires( requires(Env1&, const Query& query, Args&&... args) { env1.query(query, ::std::forward(args)...); } || - requires(Env2& e2, const Query& query, Args&&... args) { - e2.query(query, ::std::forward(args)...); - }) + requires(Env2& e2, const Query& query, Args&&... args) { e2.query(query, ::std::forward(args)...); }) auto query(const Query& query, Args&&... args) noexcept -> decltype(auto) { if constexpr (requires { env1.query(query, ::std::forward(args)...); }) { return env1.query(query, ::std::forward(args)...); diff --git a/include/beman/execution26/detail/just.hpp b/include/beman/execution26/detail/just.hpp index 13188155..8296e87d 100644 --- a/include/beman/execution26/detail/just.hpp +++ b/include/beman/execution26/detail/just.hpp @@ -108,15 +108,15 @@ using just_stopped_t = ::beman::execution26::detail::just_t<::beman::execution26 * } * */ -inline constexpr ::beman::execution26::just_t just{}; +inline constexpr ::beman::execution26::just_t just{}; /*! * \brief just_error(_error_) yields a sender completing with set_error_t(_Error_) * \headerfile beman/execution26/execution.hpp * * \details - * `just_error` is a callable object of type `just_error_t`. Invoking just_error(_error_) yields a sender which - * stores its argument and produces an error completion with this error when started. This sender completes + * `just_error` is a callable object of type `just_error_t`. Invoking just_error(_error_) yields a sender + * which stores its argument and produces an error completion with this error when started. This sender completes * synchronously when started. * *

Usage

@@ -145,10 +145,9 @@ inline constexpr ::beman::execution26::just_t just{}; * uses that as the input for `upon_error` consuming the error and producing * a value completion: using sync_wait(just_error(_error_)) * directly doesn't work because `sync_wait` requires exactly one value completion - * from its argument and `set_error` only has an error completion. The function used with `upon_error` verifies that the - * expected code was produced and also sets the flag `had_error` indicating it - * was called at all. This flag is checked after waiting for the result - * in `sync_wait`. + * from its argument and `set_error` only has an error completion. The function used with `upon_error` verifies that + * the expected code was produced and also sets the flag `had_error` indicating it was called at all. This flag is + * checked after waiting for the result in `sync_wait`. * *
  * #include 
@@ -167,16 +166,15 @@ inline constexpr ::beman::execution26::just_t         just{};
  * }
  * 
*/ -inline constexpr ::beman::execution26::just_error_t just_error{}; +inline constexpr ::beman::execution26::just_error_t just_error{}; /*! * \brief just_stopped(_) yields a sender completing with set_stopped_t() * \headerfile beman/execution26/execution.hpp * * \details - * `just_stopped` is a callable object of type `just_stopped_t`. Invoking just_stopped() yields a sender which - * produces a cancellation completion when started. This sender completes - * synchronously when started. + * `just_stopped` is a callable object of type `just_stopped_t`. Invoking just_stopped() yields a sender + * which produces a cancellation completion when started. This sender completes synchronously when started. * *

Usage

*
@@ -200,7 +198,7 @@ inline constexpr ::beman::execution26::just_error_t   just_error{};
  * uses that as the input for `upon_stopped` consuming the cancellation and producing
  * a value completion: using sync_wait(just_stopped())
  * directly doesn't work because `sync_wait` requires exactly one value completion
- * from its argument and `set_stopped` only has a cancellation completion. The function used with `upon_stopped` 
+ * from its argument and `set_stopped` only has a cancellation completion. The function used with `upon_stopped`
  * sets the flag `had_stopped` indicating it
  * was called at all. This flag is checked after waiting for the result
  * in `sync_wait`.
diff --git a/include/beman/execution26/detail/make_env.hpp b/include/beman/execution26/detail/make_env.hpp
index 5b7c9860..1b8cb020 100644
--- a/include/beman/execution26/detail/make_env.hpp
+++ b/include/beman/execution26/detail/make_env.hpp
@@ -17,7 +17,7 @@ class make_env {
 
   public:
     template 
-    make_env(const Query&, V&& value) : value(::std::forward(value)) {}
+    make_env(const Query&, V&& v) : value(::std::forward(v)) {}
     constexpr auto query(const Query&) const noexcept -> const Value& { return this->value; }
     constexpr auto query(const Query&) noexcept -> Value& { return this->value; }
 };
diff --git a/include/beman/execution26/detail/meta_combine.hpp b/include/beman/execution26/detail/meta_combine.hpp
index 75443eb5..508d8046 100644
--- a/include/beman/execution26/detail/meta_combine.hpp
+++ b/include/beman/execution26/detail/meta_combine.hpp
@@ -29,7 +29,7 @@ struct combine, L1, L...> {
 
 namespace beman::execution26::detail::meta {
 template 
-using combine = ::beman::execution26::detail::meta::detail::combine::type;
+using combine = typename ::beman::execution26::detail::meta::detail::combine::type;
 }
 
 // ----------------------------------------------------------------------------
diff --git a/include/beman/execution26/detail/meta_filter.hpp b/include/beman/execution26/detail/meta_filter.hpp
index e77c3915..edb60af3 100644
--- a/include/beman/execution26/detail/meta_filter.hpp
+++ b/include/beman/execution26/detail/meta_filter.hpp
@@ -13,22 +13,44 @@ namespace beman::execution26::detail::meta::detail {
 template