From cc5e56117c360a91949e0452073275f80da07cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C5=91rinc=20Serf=C5=91z=C5=91?= Date: Tue, 5 Dec 2023 18:03:30 +0100 Subject: [PATCH] Implemented Callback sample (#87) * Implemented callback sample * Minor fixes from code review * Minor fixes from code review II. --- lib/include/CL/Utils/Context.hpp | 2 + lib/src/Utils/Context.cpp | 16 + samples/core/CMakeLists.txt | 9 +- samples/core/callback/CMakeLists.txt | 53 ++ samples/core/callback/README.md | 104 ++++ .../callback/callback_sample_sequence.drawio | 213 +++++++ .../callback/callback_sample_sequence.svg | 4 + samples/core/callback/main.c | 521 ++++++++++++++++++ samples/core/callback/main.cpp | 333 +++++++++++ samples/core/callback/reaction_diffusion.cl | 50 ++ 10 files changed, 1301 insertions(+), 4 deletions(-) create mode 100644 samples/core/callback/CMakeLists.txt create mode 100644 samples/core/callback/README.md create mode 100644 samples/core/callback/callback_sample_sequence.drawio create mode 100644 samples/core/callback/callback_sample_sequence.svg create mode 100644 samples/core/callback/main.c create mode 100644 samples/core/callback/main.cpp create mode 100644 samples/core/callback/reaction_diffusion.cl diff --git a/lib/include/CL/Utils/Context.hpp b/lib/include/CL/Utils/Context.hpp index bd1110c3..f1ace827 100644 --- a/lib/include/CL/Utils/Context.hpp +++ b/lib/include/CL/Utils/Context.hpp @@ -13,5 +13,7 @@ namespace util { Context UTILSCPP_EXPORT get_context(cl_uint plat_id, cl_uint dev_id, cl_device_type type, cl_int* error = nullptr); + + void UTILSCPP_EXPORT print_device_info(const cl::Device& device); } } diff --git a/lib/src/Utils/Context.cpp b/lib/src/Utils/Context.cpp index 7882998f..806ed634 100644 --- a/lib/src/Utils/Context.cpp +++ b/lib/src/Utils/Context.cpp @@ -1,6 +1,9 @@ // OpenCL SDK includes #include +#include +#include + cl::Context cl::util::get_context(cl_uint plat_id, cl_uint dev_id, cl_device_type type, cl_int* error) { @@ -40,3 +43,16 @@ cl::Context cl::util::get_context(cl_uint plat_id, cl_uint dev_id, return cl::Context{}; } + +void cl::util::print_device_info(const cl::Device& device) +{ + const cl::Platform platform(device.getInfo()); + const std::string platform_vendor = platform.getInfo(); + const std::string device_name = device.getInfo(); + const std::string device_opencl_c_version = + device.getInfo(); + std::cout << "Selected platform by " << platform_vendor + << "\nSelected device: " << device_name << '\n' + << device_opencl_c_version << '\n' + << std::endl; +} diff --git a/samples/core/CMakeLists.txt b/samples/core/CMakeLists.txt index 75d8357d..c09818a3 100644 --- a/samples/core/CMakeLists.txt +++ b/samples/core/CMakeLists.txt @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_subdirectory(enumopencl) +add_subdirectory(binaries) +add_subdirectory(blur) +add_subdirectory(callback) add_subdirectory(copybuffer) add_subdirectory(copybufferkernel) -add_subdirectory(saxpy) +add_subdirectory(enumopencl) add_subdirectory(reduce) -add_subdirectory(blur) -add_subdirectory(binaries) +add_subdirectory(saxpy) diff --git a/samples/core/callback/CMakeLists.txt b/samples/core/callback/CMakeLists.txt new file mode 100644 index 00000000..54d05d84 --- /dev/null +++ b/samples/core/callback/CMakeLists.txt @@ -0,0 +1,53 @@ +# Copyright (c) 2023 The Khronos Group Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +find_package(Threads) + +file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/threads.c" +"#include +#include +int main(void) { thrd_sleep(&(struct timespec){.tv_nsec=1}, NULL); } +") + +# Signature can be modernized at CMake version 3.25 +try_compile( + HAS_C11_THREADS + "${CMAKE_CURRENT_BINARY_DIR}" + SOURCES "${CMAKE_CURRENT_BINARY_DIR}/threads.c" + C_STANDARD 11 + C_STANDARD_REQUIRED ON +) + +if (HAS_C11_THREADS) + add_sample( + TEST + TARGET callback + VERSION 300 + SOURCES main.c + KERNELS reaction_diffusion.cl) + target_link_libraries(callback PRIVATE + $) +else() + message(WARNING + "Skipping callback sample, C11 standard threads are not supported with the current toolset") +endif() + +add_sample( + TEST + TARGET callbackcpp + VERSION 300 + SOURCES main.cpp + KERNELS reaction_diffusion.cl) +target_link_libraries(callbackcpp PRIVATE + $) diff --git a/samples/core/callback/README.md b/samples/core/callback/README.md new file mode 100644 index 00000000..8c1eb8f4 --- /dev/null +++ b/samples/core/callback/README.md @@ -0,0 +1,104 @@ +# Callback sample - synchronizing Command Queues with Events + +## Sample purpose + +This sample demonstrates how to synchronize the execution of multiple command queues with each other and with host-side calculations. + +## Key APIs and Concepts + +If all operations are enqueued on a single command queue, the execution of the operations is sequential, and the ordering of the operations matches the order in which they were enqueued (unless `CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE` was specified). + +However, sometimes it is more performant to enqueue unrelated operations to separate command queues, enabling the parallel execution capabilities of the device. The primer utility of synchronization across command queues are **OpenCL event objects**. Each enqueue operation can take a list of events which it will synchronize with. Also, each enqueue operation can emit a single event object that can be used to synchronize other operations with that one. Additionally, using the **callback functionality**, host code can be executed at various points in the event's lifetime. + +### Kernel logic + +The sample implements the simulation of a theoretical chemical reaction, namely the [Gray-Scott model](https://groups.csail.mit.edu/mac/projects/amorphous/GrayScott/) of the [reaction-diffusion system](https://en.wikipedia.org/wiki/Reaction%E2%80%93diffusion_system). Explaining the algorithm is not the goal of this example. We only need to know, that the simulation progresses iteratively: given a 2D lattice of the concentration of the $U$ and $V$ chemicals (stored in the R and G channels of an image texture respectively), the subsequent state is calculated by the kernel. The algorithm results in visually interesting shapes, with the order being obvious to the viewer: the more "spread" chemical $V$ is, the later iteration step we observe. + +## Application flow + +### Overview + +From an initial state, the iteration progresses through a fixed number of steps. The number of steps can be specified on the command line. Two 2D image objects are used to store the state of the simulation, one for the input and another for the output state, and the images are swapped after each step. At every Nth iteration (N being controlled from the command line), the state of the simulation is dispatched for writing to the disk as a PNG image. + +### Command queues and synchronization + +

+ Sequence diagram of the sample's execution flow +

+ +In this example, kernel launches, image-to-buffer copies (device-to-device copy) and buffer reads (device-to-host copy) are performed. Some systems allow the overlapping of these operations, thereby it makes sense to enqueue all three of these operation types to separate command queues. The journey of each iteration state is the following: + +1. The state is calculated from the previous state by the `reaction_diffusion_step` kernel. The kernel launch synchronizes with the previous image-to-buffer copy, which could have happened in the previous iteration, but also quite a few iterations ago. +2. If the iteration index is a multiple of N, a device buffer object and a host vector of the same size are allocated. Otherwise, the next iteration starts calculating at step 1. +3. A copy of the output image to the allocated buffer object is enqueued. Eventually the simulation state is read to host memory, but since device-to-device copy is usually faster than device-to-host copy, first the previous output image is copied to a buffer and this buffer is read to the host in step 4. Note, that the copy can be performed concurrently with the kernel launch of the next iteration, since they both read from the same image object. Only the subsequent iteration's compute launch has to synchronize with this copy. +4. The read of the buffer (i.e. device-to-host copy) is enqueued on the read queue. If the device has concurrent copy capabilities, this read potentially overlaps with a previous copy (step 3.) operation. +5. After the read of the buffer is completed, its contents need to be written to an image file. This has to synchronize with the read operation, but this time, we need to execute host code instead of an OpenCL enqueue. For that, we set the completion callback of the event produced by the read enqueue. A `void*` argument is passed to the callback, which is used to identify the host vector containing the data. +6. The callback is executed on a thread used by the OpenCL runtime. Therefore it is advised that the callback returns as quickly as possible. To achieve this, the image write is dispatched to a different thread, using `std::async` in the C++ version of the sample, and `thrd_create` in the C version. +7. When the image write has finished, the completion is signaled back to the waiting main thread, otherwise the executable would possibly exit before completion. For this purpose `std::future::wait` is used in the C++ version, and a conditional variable in the C version. + +## Used API surface (C++) + +```c++ +cl::Buffer::Buffer(cl::Context, cl_mem_flags, std::size_t size) +cl::CommandQueue::enqueueCopyImageToBuffer(cl::Image2D, cl::Buffer, std::array, + std::array, std::size_t, + std::vector*, cl::Event*) +cl::CommandQueue::enqueueFillImage(cl::Image2D, cl_float4, std::array, std::array) +cl::CommandQueue::enqueueReadBuffer(cl::Buffer, bool, std::size_t, std::size_t, + void*, std::vector*, cl::Event*) +cl::Context::getInfo() +cl::Context::getSupportedImageFormats(cl_mem_flags, cl_mem_object_type, std::vector*) +cl::Device::getInfo() +cl::Device::getInfo() +cl::EnqueueArgs::EnqueueArgs(cl::CommandQueue, cl::Event, cl::NDRange) +cl::Event::Event() +cl::Event::Event(cl::Event) +cl::Event::setCallback(cl_int, void(*)(cl_event, cl_int, void*), void*) +cl::Image2D::Image2D(cl::Context, cl_mem_flags, cl::ImageFormat, std::size_t, std::size_t) +cl::ImageFormat::ImageFormat(cl_channel_order, cl_channel_type) +cl::KernelFunctor(cl::Program, std::string) +cl::NDRange(std::size_t, std::size_t) +cl::Platform::getInfo() +cl::Platform::Platform(cl_platform) +cl::Program::build(cl::Device) +cl::Program::Program(cl::Context, std::string) +cl::sdk::comprehend() +cl::sdk::parse() +cl::sdk::parse_cli() +cl::UserEvent::setStatus(cl_int) +cl::UserEvent::UserEvent(cl::Context) +``` + +## Used API surface (C) + +```c +clCreateBuffer +clCreateCommandQueueWithProperties +clCreateContext +clCreateImage +clCreateKernel +clCreateProgramWithSource +clEnqueueCopyImageToBuffer +clEnqueueFillImage +clEnqueueNDRangeKernel +clEnqueueReadBuffer +clGetDeviceInfo +clGetSupportedImageFormats +clReleaseCommandQueue +clReleaseContext +clReleaseDevice +clReleaseEvent +clReleaseEvent +clReleaseKernel +clReleaseMemObject +clReleaseProgram +clSetEventCallback +clSetKernelArg +cnd_destroy +cnd_signal +mtx_destroy +mtx_lock +mtx_unlock +thrd_create +thrd_detach +``` diff --git a/samples/core/callback/callback_sample_sequence.drawio b/samples/core/callback/callback_sample_sequence.drawio new file mode 100644 index 00000000..f9e9a6f5 --- /dev/null +++ b/samples/core/callback/callback_sample_sequence.drawio @@ -0,0 +1,213 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/core/callback/callback_sample_sequence.svg b/samples/core/callback/callback_sample_sequence.svg new file mode 100644 index 00000000..05781a6f --- /dev/null +++ b/samples/core/callback/callback_sample_sequence.svg @@ -0,0 +1,4 @@ + + + +
Compute Queue
Compute Queue
Copy Queue
Copy Queue
copy event
copy event
Read Queue
Read Queue
copy event
copy event
Worker thread
Worker thread
compute event
compute event
compute event
compute event
copy event
copy event
copy event
copy event
read event
callback
read event...
Main thread
Main thread
image write
finished
image write...
computing the next simulation state
computing the next simulat...
copying the simulation
state from image to
buffer
copying the simulation...
reading the buffer's
contents to host
memory
reading the buffer's...
writing image file
writing image file
compute event
compute event
copy event
copy event
copy event
copy event
read event
callback
read event...
image write
finished
image write...
start
iteration
start...
Text is not SVG - cannot display
\ No newline at end of file diff --git a/samples/core/callback/main.c b/samples/core/callback/main.c new file mode 100644 index 00000000..73d93828 --- /dev/null +++ b/samples/core/callback/main.c @@ -0,0 +1,521 @@ +/* + * Copyright (c) 2023 The Khronos Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// OpenCL SDK includes +#include +#include +#include +#include + +// standard includes +#include +#include +#include +#include +#include +#include +#include +#include + +#define THRDERROR(func, err, label) \ + do \ + { \ + if (thrd_success != (func)) \ + { \ + err = -1; \ + goto label; \ + } \ + } while (0) + +typedef struct +{ + cl_device_id device; + cl_context context; + cl_event compute_event; + cl_event copy_event; + cl_event read_event; + cl_command_queue compute_queue; + cl_command_queue copy_queue; + cl_command_queue read_queue; + cl_platform_id platform; + cl_program program; + cl_kernel kernel; + cl_mem img_a; + cl_mem img_b; + cl_mem copy_buffer; +} state_t; + +typedef struct +{ + cnd_t image_save_finished_signal; + mtx_t image_save_finished_mtx; + size_t image_save_finished_count; +} global_state_t; + +static global_state_t g_state; + +typedef struct image_save_task +{ + cl_sdk_image image; + size_t index; +} image_save_task; + +static int image_save_consumer_thread(void* data) +{ + int error = 0; + image_save_task* task = (image_save_task*)data; + char image_filename[FILENAME_MAX]; + memset(image_filename, 0, FILENAME_MAX); + sprintf(image_filename, "callback_out_%zu.png", task->index); + OCLERROR_RET(cl_sdk_write_image(image_filename, &task->image), error, end); + printf("Written image to %s\n", image_filename); + + // signal completion + THRDERROR(mtx_lock(&g_state.image_save_finished_mtx), error, end); + g_state.image_save_finished_count++; + THRDERROR(mtx_unlock(&g_state.image_save_finished_mtx), error, end); + THRDERROR(cnd_signal(&g_state.image_save_finished_signal), error, end); + +end: + free(task->image.pixels); + free(task); + if (error != 0) + { + printf("image_save_consumer_thread encountered an error. Exiting"); + exit(error); + } + return error; +} + +static void start_image_save_task(cl_event event, cl_int status, + void* user_data) +{ + (void)event; + (void)status; + int error = 0; + // While it's not optimal to launch a thread for each saved image, + // for the purpose of demonstration it is sufficient. + thrd_t thread_handle; + THRDERROR( + thrd_create(&thread_handle, image_save_consumer_thread, user_data), + error, end); + THRDERROR(thrd_detach(thread_handle), error, end); + +end: + if (error != 0) + { + printf("An error occurred in callback, exiting"); + exit(error); + } +} + +static cl_int enqueue_buffer_read(cl_command_queue queue, cl_mem buf, + size_t side, size_t idx, + const cl_event* wait_event, cl_event* event) +{ + cl_int error = CL_SUCCESS; + image_save_task* task; + MEM_CHECK(task = (image_save_task*)malloc(sizeof(image_save_task)), error, + error_end); + task->image.width = side; + task->image.height = side; + task->image.pixel_size = sizeof(cl_uchar4); + MEM_CHECK(task->image.pixels = + (unsigned char*)malloc(side * side * sizeof(cl_uchar4)), + error, free_task); + task->index = idx; + + OCLERROR_RET(clEnqueueReadBuffer(queue, buf, CL_FALSE, 0, + side * side * sizeof(cl_uchar4), + task->image.pixels, 1, wait_event, event), + error, free_pixels); + OCLERROR_RET( + clSetEventCallback(*event, CL_COMPLETE, start_image_save_task, task), + error, free_pixels); + return error; + +free_pixels: + free(task->image.pixels); +free_task: + free(task); +error_end: + return error; +} + +// Sample-specific option +struct options_Callback +{ + size_t side; + size_t iterations; + size_t write_iter; +}; + +cag_option CallbackOptions[] = { + { .identifier = 's', + .access_letters = "s", + .access_name = "side", + .value_name = "(positive int)", + .description = "Side length of the generated image in pixels" }, + + { .identifier = 'i', + .access_letters = "i", + .access_name = "iter", + .value_name = "(positive int)", + .description = "Number of iterations in the simulation" }, + + { .identifier = 'w', + .access_letters = "w", + .access_name = "write_iter", + .value_name = "(positive int)", + .description = "Controls after how many iterations the intermediate " + "result is written to file" }, +}; + +ParseState parse_CallbackOptions(const char identifier, + cag_option_context* cag_context, + struct options_Callback* opts) +{ + const char* value; + +#define IF_ERR(op) \ + if ((value = cag_option_get_value(cag_context))) \ + { \ + op; \ + return ParsedOK; \ + } \ + else \ + return ParseError; + + switch (identifier) + { + case 's': IF_ERR(opts->side = (size_t)atoi(value)) + case 'i': IF_ERR(opts->iterations = (size_t)atoi(value)) + case 'w': IF_ERR(opts->write_iter = (size_t)atoi(value)) + } + return NotParsed; +} + +static cl_int parse_options(int argc, char* argv[], + struct cl_sdk_options_SingleDevice* dev_opts, + struct options_Callback* cb_opts) +{ + cl_int error = CL_SUCCESS; + struct cag_option *opts = NULL, *tmp = NULL; + size_t n = 0; + + /* Prepare all options array. */ + MEM_CHECK(opts = add_CLI_options(opts, &n, SingleDeviceOptions, + CAG_ARRAY_SIZE(SingleDeviceOptions)), + error, end); + MEM_CHECK(tmp = add_CLI_options(opts, &n, DiagnosticOptions, + CAG_ARRAY_SIZE(DiagnosticOptions)), + error, end); + opts = tmp; + MEM_CHECK(tmp = add_CLI_options(opts, &n, CallbackOptions, + CAG_ARRAY_SIZE(CallbackOptions)), + error, end); + opts = tmp; + + char identifier; + cag_option_context cag_context; + + /* Prepare the context and iterate over all options. */ + cag_option_prepare(&cag_context, opts, n, argc, argv); + while (cag_option_fetch(&cag_context)) + { + ParseState state = NotParsed; + identifier = cag_option_get(&cag_context); + + PARS_OPTIONS( + parse_SingleDeviceOptions(identifier, &cag_context, dev_opts), + state); + PARS_OPTIONS(parse_CallbackOptions(identifier, &cag_context, cb_opts), + state); + + if (identifier == 'h') + { + printf("Usage: callback [OPTION]...\n"); + printf("Option name and value should be separated by '=' or a " + "space\n"); + cag_option_print(opts, n, stdout); + exit((state == ParseError) ? CL_INVALID_ARG_VALUE : CL_SUCCESS); + } + } + +end: + free(opts); + return error; +} + +int main(int argc, char* argv[]) +{ + state_t state; + srand(time(NULL)); + g_state.image_save_finished_count = 0; + int error = 0; + THRDERROR(mtx_init(&g_state.image_save_finished_mtx, mtx_plain), error, + end); + THRDERROR(cnd_init(&g_state.image_save_finished_signal), error, save_mtx); + + struct cl_sdk_options_SingleDevice dev_opts = { + .triplet = { 0, 0, CL_DEVICE_TYPE_ALL } + }; + struct options_Callback cb_opts = { + .side = 512, + .iterations = 10000, + .write_iter = 1000, + }; + + OCLERROR_RET(parse_options(argc, argv, &dev_opts, &cb_opts), error, + save_cnd); + /// Create runtime objects based on user preference or default + OCLERROR_PAR(state.device = cl_util_get_device( + dev_opts.triplet.plat_index, dev_opts.triplet.dev_index, + dev_opts.triplet.dev_type, &error), + error, save_cnd); + cl_util_print_device_info(state.device); + OCLERROR_PAR(state.context = clCreateContext(NULL, 1, &state.device, NULL, + NULL, &error), + error, dev); + OCLERROR_RET(clGetDeviceInfo(state.device, CL_DEVICE_PLATFORM, + sizeof(cl_platform_id), &state.platform, NULL), + error, cont); + + size_t kernel_source_length; + char* kernel_source; + OCLERROR_PAR(kernel_source = cl_util_read_text_file( + "./reaction_diffusion.cl", &kernel_source_length, &error), + error, cont); + OCLERROR_PAR(state.program = clCreateProgramWithSource( + state.context, 1, (const char**)&kernel_source, + &kernel_source_length, &error), + error, ksource); + OCLERROR_RET(cl_util_build_program(state.program, state.device, NULL), + error, prog); + OCLERROR_PAR(state.kernel = clCreateKernel( + state.program, "reaction_diffusion_step", &error), + error, prog); + cl_command_queue_properties props[] = { CL_QUEUE_PROPERTIES, + CL_QUEUE_PROFILING_ENABLE, 0 }; + OCLERROR_PAR(state.compute_queue = clCreateCommandQueueWithProperties( + state.context, state.device, props, &error), + error, kern); + OCLERROR_PAR(state.copy_queue = clCreateCommandQueueWithProperties( + state.context, state.device, props, &error), + error, comp_queue); + OCLERROR_PAR(state.read_queue = clCreateCommandQueueWithProperties( + state.context, state.device, props, &error), + error, copy_queue); + cl_uint num_image_formats; + OCLERROR_RET(clGetSupportedImageFormats(state.context, CL_MEM_READ_WRITE, + CL_MEM_OBJECT_IMAGE2D, 0, NULL, + &num_image_formats), + error, read_queue); + cl_image_format* supported_image_formats; + MEM_CHECK(supported_image_formats = (cl_image_format*)malloc( + sizeof(cl_image_format) * num_image_formats), + error, read_queue); + + OCLERROR_RET(clGetSupportedImageFormats( + state.context, CL_MEM_READ_WRITE, CL_MEM_OBJECT_IMAGE2D, + num_image_formats, supported_image_formats, NULL), + error, free_image_formats); + cl_image_format image_format = { + .image_channel_order = CL_RGBA, + .image_channel_data_type = CL_UNORM_INT8, + }; + bool found_required_image_format = false; + for (cl_uint image_format_idx = 0; image_format_idx < num_image_formats; + ++image_format_idx) + { + if (supported_image_formats[image_format_idx].image_channel_data_type + == image_format.image_channel_data_type + && supported_image_formats[image_format_idx].image_channel_order + == image_format.image_channel_order) + { + found_required_image_format = true; + break; + } + } + if (!found_required_image_format) + { + error = -1; + printf("Error: the required OpenCL image format is not supported by " + "the selected device"); + goto free_image_formats; + } + + cl_image_desc image_desc = { + .image_height = cb_opts.side, + .image_width = cb_opts.side, + .image_type = CL_MEM_OBJECT_IMAGE2D, + }; + OCLERROR_PAR(state.img_a = + clCreateImage(state.context, CL_MEM_READ_WRITE, + &image_format, &image_desc, NULL, &error), + error, free_image_formats); + OCLERROR_PAR(state.img_b = + clCreateImage(state.context, CL_MEM_READ_WRITE, + &image_format, &image_desc, NULL, &error), + error, free_image_a); + cl_float4 fill_color = { { 1.F, 0.F, 0.F, 1.F } }; + size_t fill_origin[3] = { 0, 0, 0 }; + size_t fill_region[3] = { cb_opts.side, cb_opts.side, 1 }; + OCLERROR_RET(clEnqueueFillImage(state.compute_queue, state.img_a, + &fill_color, fill_origin, fill_region, 0, + NULL, NULL), + error, free_image_b); + fill_color.y = 1.F; + fill_origin[0] = cb_opts.side / 2; + fill_origin[1] = cb_opts.side / 2; + fill_region[0] = cb_opts.side / 100; + fill_region[1] = cb_opts.side / 100; + OCLERROR_RET(clEnqueueFillImage(state.compute_queue, state.img_a, + &fill_color, fill_origin, fill_region, 0, + NULL, NULL), + error, free_image_b); + OCLERROR_PAR(state.copy_buffer = clCreateBuffer( + state.context, CL_MEM_READ_WRITE, + cb_opts.side * cb_opts.side * sizeof(cl_uchar4), NULL, + &error), + error, free_image_b); + + state.compute_event = NULL; + state.copy_event = NULL; + state.read_event = NULL; + for (size_t iter = 0; iter < cb_opts.iterations; ++iter) + { + cl_uint work_dim = 2; + size_t global_work_size[2] = { cb_opts.side, cb_opts.side }; + OCLERROR_RET( + clSetKernelArg(state.kernel, 0, sizeof(cl_mem), &state.img_a), + error, copy_buf); + OCLERROR_RET( + clSetKernelArg(state.kernel, 1, sizeof(cl_mem), &state.img_b), + error, copy_buf); + if (state.compute_event) + { + OCLERROR_RET(clReleaseEvent(state.compute_event), error, copy_buf); + } + OCLERROR_RET(clEnqueueNDRangeKernel( + state.compute_queue, state.kernel, work_dim, NULL, + global_work_size, NULL, + state.read_event == NULL ? 0 : 1, + state.read_event == NULL ? NULL : &state.read_event, + &state.compute_event), + error, copy_buf); + + if (iter % cb_opts.write_iter == 0) + { + size_t copy_origin[3] = { 0, 0, 0 }; + size_t copy_region[3] = { cb_opts.side, cb_opts.side, 1 }; + if (state.copy_event) + { + OCLERROR_RET(clReleaseEvent(state.copy_event), error, + compute_ev); + } + OCLERROR_RET(clEnqueueCopyImageToBuffer( + state.copy_queue, state.img_a, state.copy_buffer, + copy_origin, copy_region, 0, 1u, + &state.compute_event, &state.copy_event), + error, compute_ev); + if (state.read_event) + { + OCLERROR_RET(clReleaseEvent(state.read_event), error, copy_ev); + } + OCLERROR_RET( + enqueue_buffer_read(state.read_queue, state.copy_buffer, + cb_opts.side, iter / cb_opts.write_iter, + &state.copy_event, &state.read_event), + error, copy_ev); + } + + cl_mem tmp = state.img_a; + state.img_a = state.img_b; + state.img_b = tmp; + } + + // Make sure that all queued items are dispatched for execution + OCLERROR_RET(clFlush(state.compute_queue), error, read_ev); + OCLERROR_RET(clFlush(state.copy_queue), error, read_ev); + OCLERROR_RET(clFlush(state.read_queue), error, read_ev); + + size_t num_saved_images = + (cb_opts.iterations + cb_opts.write_iter - 1) / cb_opts.write_iter; + + for (;;) + { + THRDERROR(mtx_lock(&g_state.image_save_finished_mtx), error, read_ev); + bool wait_for_saving = + num_saved_images != g_state.image_save_finished_count; + if (wait_for_saving) + { + cnd_wait(&g_state.image_save_finished_signal, + &g_state.image_save_finished_mtx); + THRDERROR(mtx_unlock(&g_state.image_save_finished_mtx), error, + read_ev); + } + else + { + THRDERROR(mtx_unlock(&g_state.image_save_finished_mtx), error, + read_ev); + break; + } + } + + cl_int end_error = CL_SUCCESS; +read_ev: + OCLERROR_RET(clReleaseEvent(state.read_event), end_error, copy_ev); +copy_ev: + OCLERROR_RET(clReleaseEvent(state.copy_event), end_error, compute_ev); +compute_ev: + OCLERROR_RET(clReleaseEvent(state.compute_event), end_error, copy_buf); +copy_buf: + OCLERROR_RET(clReleaseMemObject(state.copy_buffer), end_error, + free_image_b); +free_image_b: + OCLERROR_RET(clReleaseMemObject(state.img_b), end_error, free_image_a); +free_image_a: + OCLERROR_RET(clReleaseMemObject(state.img_a), end_error, + free_image_formats); +free_image_formats: + free(supported_image_formats); +read_queue: + OCLERROR_RET(clReleaseCommandQueue(state.read_queue), end_error, + comp_queue); +copy_queue: + OCLERROR_RET(clReleaseCommandQueue(state.copy_queue), end_error, + comp_queue); +comp_queue: + OCLERROR_RET(clReleaseCommandQueue(state.compute_queue), end_error, kern); +kern: + OCLERROR_RET(clReleaseKernel(state.kernel), end_error, prog); +prog: + OCLERROR_RET(clReleaseProgram(state.program), end_error, ksource); +ksource: + free(kernel_source); +cont: + OCLERROR_RET(clReleaseContext(state.context), end_error, dev); +dev: + OCLERROR_RET(clReleaseDevice(state.device), end_error, save_cnd); +save_cnd: + cnd_destroy(&g_state.image_save_finished_signal); +save_mtx: + mtx_destroy(&g_state.image_save_finished_mtx); +end: + if (error) cl_util_print_error(error); + return error; +} diff --git a/samples/core/callback/main.cpp b/samples/core/callback/main.cpp new file mode 100644 index 00000000..d85405b0 --- /dev/null +++ b/samples/core/callback/main.cpp @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2023 The Khronos Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// OpenCL SDK includes +#include +#include +#include +#include +#include + +// standard header includes +#include +#include +#include +#include +#include +#include +#include // std::make_tuple +#include + +// TCLAP includes +#include + +namespace { + +struct ReadJob +{ + std::vector data; + std::size_t side; +}; + +std::mutex read_mutex; +std::map read_jobs; +std::vector> image_write_tasks; + +template struct DoubleBuffer +{ + T read, write; + + void swap() { std::swap(read, write); } +}; + +void CL_CALLBACK read_complete_callback(cl_event, cl_int, void* user_data) +{ + // Acquire the job associated with the received ID and remove it from the + // map. These operations have to be protected, because the main thread or + // other callbacks might modify the map concurrently. + const auto job_id = reinterpret_cast(user_data); + ReadJob job; + { + std::lock_guard copy_lock(read_mutex); + const auto iter = read_jobs.find(job_id); + job = std::move(iter->second); + read_jobs.erase(iter); + } + + // Launch an asynchronous task (on the CPU) which writes the read contents + // of the buffer to a PNG file. Notice, how the relevant members of the job + // are moved to the closure. + // Since no std::launch is passed, the task will either run deferred (on the + // main thread calling .wait()), or truly asynchronously on a different + // thread. The main point is that it is not running on the OpenCL runtime's + // thread. + image_write_tasks[static_cast(job_id)] = + std::async([pixels = std::move(job.data), side = job.side, job_id] { + const cl::sdk::Image image{ + static_cast(side), + static_cast(side), + static_cast(sizeof(cl_uchar4)), + std::move(pixels), + }; + const std::string filename = + "callbackcpp_out" + std::to_string(job_id) + ".png"; + cl::sdk::write_image(filename.c_str(), image); + std::cout << "Written image to " << filename << '\n'; + }); +} + +struct CallbackOptions +{ + std::size_t side; + std::size_t iterations; + std::size_t write_iter; +}; + +} // namespace + +template <> auto cl::sdk::parse() +{ + return std::make_tuple( + std::make_shared>( + "s", "side", "Side length of the generated image in pixels", false, + 512, "positive integral"), + std::make_shared>( + "i", "iter", "Number of iterations in the simulation", false, 10000, + "positive integral"), + std::make_shared>( + "w", "write_iter", + "Controls after how many iterations the intermediate result is " + "written to file", + false, 1000, "positive integral")); +} + +template <> +CallbackOptions cl::sdk::comprehend( + std::shared_ptr> side_arg, + std::shared_ptr> iter_arg, + std::shared_ptr> write_iter_arg) +{ + return CallbackOptions{ side_arg->getValue(), iter_arg->getValue(), + write_iter_arg->getValue() }; +} + +int main(int argc, char* argv[]) +{ + try + { + // Parse command-line options + auto opts = + cl::sdk::parse_cli( + argc, argv); + const auto& diag_opts = std::get<0>(opts); + const auto& dev_opts = std::get<1>(opts); + const auto& alg_opts = std::get<2>(opts); + + // Create runtime objects based on user preference or default + cl::Context context = cl::sdk::get_context(dev_opts.triplet); + cl::Device device = context.getInfo().at(0); + + // There is a separate command queue for kernel launches, + // device->device copies and device->host copies. + // Synchronization is ensured via events (see later). + cl::CommandQueue compute_queue{ context, device }; + cl::CommandQueue copy_queue{ context, device }; + cl::CommandQueue read_queue{ context, device }; + + cl::Platform platform{ + device.getInfo() + }; // https://github.com/KhronosGroup/OpenCL-CLHPP/issues/150 + + if (!diag_opts.quiet) + { + cl::util::print_device_info(device); + } + + // Compile kernel + const char* kernel_location = "./reaction_diffusion.cl"; + std::ifstream kernel_stream{ kernel_location }; + if (!kernel_stream.is_open()) + throw std::runtime_error{ + std::string{ "Cannot open kernel source: " } + kernel_location + }; + + cl::Program program{ context, + std::string{ std::istreambuf_iterator{ + kernel_stream }, + std::istreambuf_iterator{} } }; + program.build(device); + cl::KernelFunctor reaction_diffusion_step( + program, "reaction_diffusion_step"); + + // Check if the used image format is supported on the device + const cl::ImageFormat required_image_format(CL_RGBA, CL_UNORM_INT8); + std::vector supported_formats; + context.getSupportedImageFormats( + CL_MEM_READ_WRITE, CL_MEM_OBJECT_IMAGE2D, &supported_formats); + auto found_format = std::find_if( + supported_formats.begin(), supported_formats.end(), + [&](const cl::ImageFormat& format) { + return format.image_channel_order + == required_image_format.image_channel_order + && format.image_channel_data_type + == required_image_format.image_channel_data_type; + }); + if (found_format == supported_formats.end()) + { + throw std::runtime_error("Required image format is not supported " + "on the selected runtime"); + } + + // Options provided on the command line + const std::size_t side = alg_opts.side; + const std::size_t iterations = alg_opts.iterations; + const std::size_t save_at_every = alg_opts.write_iter; + + // Create two equivalent images. In a single iteration one serves as the + // source, the other as the destination, and then the roles are swapped. + DoubleBuffer images{ + cl::Image2D(context, CL_MEM_READ_WRITE, required_image_format, side, + side), + cl::Image2D(context, CL_MEM_READ_WRITE, required_image_format, side, + side), + }; + + // In each pixel of the images, the concentration of the + // - U (reaction source) component is stored in the R channel, + // - V (reaction result) component is stored in the G channel. + // The B channel is unused, and the A (alpha) channel must be set to 1, + // so the resulting image is visible in the image viewer. + // + // First, fill the source image with chemical U (the source component of + // the reaction) + compute_queue.enqueueFillImage(images.read, + cl_float4{ { 1.F, 0.F, 0.F, 1.F } }, + { 0, 0 }, { side, side, 1 }); + + // In addition, fill a small rectangle in the middle of the source image + // with chemical V (the result of the reaction) + compute_queue.enqueueFillImage( + images.read, cl_float4{ { 1.F, 1.F, 0.F, 1.F } }, + { side / 2, side / 2 }, { side / 100, side / 100, 1 }); + + // The copy_event is initialized with a completed user event, + // so it doesn't block the first kernel launch. + cl::UserEvent completed_event(context); + completed_event.setStatus(CL_COMPLETE); + cl::Event copy_event = completed_event; + cl::Event prev_compute_event = completed_event; + + const std::size_t num_images_writes = + (iterations + save_at_every - 1) / save_at_every; + image_write_tasks.resize(num_images_writes); + std::intptr_t copy_job_id{}; + for (std::size_t iter = 0; iter < iterations; ++iter) + { + // Enqueue the next step of the iteration and swap the source and + // destination images. This synchronizes with the previous copy of + // the input image, ensuring that the copy is finished before the + // current kernel overwrites the data. + auto compute_event = reaction_diffusion_step( + cl::EnqueueArgs(compute_queue, copy_event, + cl::NDRange(side, side)), + images.read, images.write); + + // For every Nth iteration, the current state of the simulation is + // written to a PNG file + if (iter % save_at_every == 0) + { + // Add a new entry to the job map, protected by a mutex. + // The job is associated with a job ID. + auto& job = [&]() -> ReadJob& { + std::lock_guard lock(read_mutex); + return read_jobs[copy_job_id] = {}; + }(); + // We should allocate outside of the critical section + cl::Buffer copy_buffer(context, CL_MEM_READ_WRITE, + side * side * sizeof(cl_uchar4)); + job.data = std::vector(side * side * 4); + job.side = side; + + // Enqueue the copy of the last destination image to a buffer. + // This is a device->device copy, which is probably faster than + // a device->host copy. The copy synchronizes with the previous + // kernel launch, however, the next kernel can be launched, + // while the copy is in progress, because both read only from + // images.front. + const std::vector compute_events{ + prev_compute_event + }; + copy_queue.enqueueCopyImageToBuffer( + images.read, copy_buffer, { 0, 0 }, { side, side, 1 }, 0, + &compute_events, ©_event); + + // When the device->device copy is finished, a device->host read + // can start. The resulting data on the host is accessible + // through the job object. + const std::vector copy_events{ copy_event }; + cl::Event read_event; + read_queue.enqueueReadBuffer( + copy_buffer, false, 0, side * side * sizeof(cl_uchar4), + job.data.data(), ©_events, &read_event); + + // When the device->host read is finished, a callback is called. + // It spawns an asynchronous CPU job that writes the data to + // file. We use the pointer-sized user-data argument to identify + // the job in the callback. + read_event.setCallback(CL_COMPLETE, read_complete_callback, + reinterpret_cast(copy_job_id)); + ++copy_job_id; + } + prev_compute_event = compute_event; + images.swap(); + } + // Wait for every read finished callback to fire. + read_queue.finish(); + + // Eventually, wait for each image file writing job to finish. + for (const auto& task : image_write_tasks) + { + task.wait(); + } + // We can expect that all jobs have been consumed at this point. + assert(read_jobs.empty()); + } catch (cl::util::Error& e) + { + std::cerr << "OpenCL Utils error: " << e.what() << std::endl; + std::exit(e.err()); + } catch (cl::BuildError& e) + { + std::cerr << "OpenCL runtime error: " << e.what() << std::endl; + for (auto& build_log : e.getBuildLog()) + { + std::cerr << "\tBuild log for device: " + << build_log.first.getInfo() << "\n" + << std::endl; + std::cerr << build_log.second << "\n" << std::endl; + } + std::exit(e.err()); + } catch (cl::Error& e) + { + std::cerr << "OpenCL runtime error: " << e.what() << " (" << e.err() + << ")" << std::endl; + std::exit(e.err()); + } catch (std::exception& e) + { + std::cerr << "Error: " << e.what() << std::endl; + std::exit(EXIT_FAILURE); + } +} diff --git a/samples/core/callback/reaction_diffusion.cl b/samples/core/callback/reaction_diffusion.cl new file mode 100644 index 00000000..2b08a918 --- /dev/null +++ b/samples/core/callback/reaction_diffusion.cl @@ -0,0 +1,50 @@ +kernel void reaction_diffusion_step(read_only image2d_t in_data, + write_only image2d_t out_data) +{ + const float DU = 1.F; + const float DV = 0.3F; + const float f = 0.055F; + const float k = 0.062F; + + const sampler_t smplr = CLK_NORMALIZED_COORDS_FALSE | CLK_FILTER_NEAREST + | CLK_ADDRESS_CLAMP_TO_EDGE; + const size_t x = get_global_id(0); + const size_t y = get_global_id(1); + const float2 uv = read_imagef(in_data, smplr, (int2){ x, y }).xy; + const float u = uv.x; + const float v = uv.y; + + float u_diffuse = 0.F; + float v_diffuse = 0.F; + for (int dy = -1; dy <= 1; ++dy) + { + for (int dx = -1; dx <= 1; ++dx) + { + const float2 _uv = + read_imagef(in_data, smplr, (int2){ x + dx, y + dy }).xy; + const float _u = _uv.x; + const float _v = _uv.y; + + if (abs(dx) + abs(dy) == 0) + { + u_diffuse += _u * -1.F; + v_diffuse += _v * -1.F; + } + else if (abs(dx) + abs(dy) == 1) + { + u_diffuse += _u * 0.2F; + v_diffuse += _v * 0.2F; + } + else + { + u_diffuse += _u * 0.05F; + v_diffuse += _v * 0.05F; + } + } + } + + const float u_new = u + DU * u_diffuse - u * v * v + f * (1 - u); + const float v_new = v + DV * v_diffuse + u * v * v - (k + f) * v; + + write_imagef(out_data, (int2){ x, y }, (float4){ u_new, v_new, 0, 1 }); +}