From 06b6fe9fb61d85d5662ef57085dc808b8b193572 Mon Sep 17 00:00:00 2001 From: Daniel Flores Date: Thu, 31 Jul 2025 23:04:11 -0700 Subject: [PATCH 1/2] add npp_stream_ctx cache --- src/torchcodec/_core/CudaDeviceInterface.cpp | 62 +++++++++++++++----- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/src/torchcodec/_core/CudaDeviceInterface.cpp b/src/torchcodec/_core/CudaDeviceInterface.cpp index 9bfea4e5..b484ba78 100644 --- a/src/torchcodec/_core/CudaDeviceInterface.cpp +++ b/src/torchcodec/_core/CudaDeviceInterface.cpp @@ -39,6 +39,9 @@ const int MAX_CUDA_GPUS = 128; const int MAX_CONTEXTS_PER_GPU_IN_CACHE = -1; std::vector g_cached_hw_device_ctxs[MAX_CUDA_GPUS]; std::mutex g_cached_hw_device_mutexes[MAX_CUDA_GPUS]; +// NPP stream context cache, with up to MAX_CUDA_GPUS contexts per GPU. +std::map> g_cached_npp_stream_ctxs; +std::mutex g_cached_npp_stream_mutexes[MAX_CUDA_GPUS]; torch::DeviceIndex getFFMPEGCompatibleDeviceIndex(const torch::Device& device) { torch::DeviceIndex deviceIndex = device.index(); @@ -162,7 +165,7 @@ AVBufferRef* getCudaContext(const torch::Device& device) { #endif } -NppStreamContext createNppStreamContext(int deviceIndex) { +NppStreamContext* createNppStreamContext(int deviceIndex) { // From 12.9, NPP recommends using a user-created NppStreamContext and using // the `_Ctx()` calls: // https://docs.nvidia.com/cuda/cuda-toolkit-release-notes/index.html#npp-release-12-9-update-1 @@ -172,6 +175,7 @@ NppStreamContext createNppStreamContext(int deviceIndex) { // https://github.com/NVIDIA/CUDALibrarySamples/blob/d97803a40fab83c058bb3d68b6c38bd6eebfff43/NPP/README.md?plain=1#L54-L72 NppStreamContext nppCtx{}; + NppStreamContext* nppCtxPtr = &nppCtx; cudaDeviceProp prop{}; cudaError_t err = cudaGetDeviceProperties(&prop, deviceIndex); TORCH_CHECK( @@ -187,16 +191,39 @@ NppStreamContext createNppStreamContext(int deviceIndex) { nppCtx.nCudaDevAttrComputeCapabilityMajor = prop.major; nppCtx.nCudaDevAttrComputeCapabilityMinor = prop.minor; - // TODO when implementing the cache logic, move these out. See other TODO - // below. - nppCtx.hStream = at::cuda::getCurrentCUDAStream(deviceIndex).stream(); - err = cudaStreamGetFlags(nppCtx.hStream, &nppCtx.nStreamFlags); - TORCH_CHECK( - err == cudaSuccess, - "cudaStreamGetFlags failed: ", - cudaGetErrorString(err)); + return nppCtxPtr; +} - return nppCtx; +NppStreamContext* getNppStreamContextFromCache(const torch::Device& device) { + torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device); + std::scoped_lock lock(g_cached_npp_stream_mutexes[deviceIndex]); + if (g_cached_npp_stream_ctxs[deviceIndex].size() > 0) { + NppStreamContext* hw_device_ctx = + g_cached_npp_stream_ctxs[deviceIndex].back(); + g_cached_npp_stream_ctxs[deviceIndex].pop_back(); + return hw_device_ctx; + } else { + return nullptr; + } +} + +NppStreamContext* getNppStreamContext(const torch::Device& device) { + // Return the cached NppStreamContext if it exists. + NppStreamContext* cached_npp_ctx_ptr = getNppStreamContextFromCache(device); + if (cached_npp_ctx_ptr != nullptr) { + return cached_npp_ctx_ptr; + } + NppStreamContext* npp_ctx_ptr = createNppStreamContext( + static_cast(getFFMPEGCompatibleDeviceIndex(device))); + + torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device); + std::scoped_lock lock(g_cached_npp_stream_mutexes[deviceIndex]); + // Add to cache if cache has capacity + if (g_cached_npp_stream_ctxs[deviceIndex].size() < MAX_CUDA_GPUS) { + g_cached_npp_stream_ctxs[deviceIndex].push_back(npp_ctx_ptr); + } + // Error to user when full? + return npp_ctx_ptr; } } // namespace @@ -305,11 +332,16 @@ void CudaDeviceInterface::convertAVFrameToFrameOutput( // TODO cache the NppStreamContext! It currently gets re-recated for every // single frame. The cache should be per-device, similar to the existing - // hw_device_ctx cache. When implementing the cache logic, the - // NppStreamContext hStream and nStreamFlags should not be part of the cache - // because they may change across calls. - NppStreamContext nppCtx = createNppStreamContext( - static_cast(getFFMPEGCompatibleDeviceIndex(device_))); + // hw_device_ctx cache. + NppStreamContext nppCtx = *getNppStreamContext(device_); + + torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device_); + nppCtx.hStream = at::cuda::getCurrentCUDAStream(deviceIndex).stream(); + cudaError_t err = cudaStreamGetFlags(nppCtx.hStream, &nppCtx.nStreamFlags); + TORCH_CHECK( + err == cudaSuccess, + "cudaStreamGetFlags failed: ", + cudaGetErrorString(err)); NppiSize oSizeROI = {width, height}; Npp8u* input[2] = {avFrame->data[0], avFrame->data[1]}; From b84c901acd4e5f1b11ac3d827b3ff600e828d2f0 Mon Sep 17 00:00:00 2001 From: Daniel Flores Date: Fri, 1 Aug 2025 15:52:01 -0700 Subject: [PATCH 2/2] Extract addNppStreamContextToCache to function --- src/torchcodec/_core/CudaDeviceInterface.cpp | 28 +++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/torchcodec/_core/CudaDeviceInterface.cpp b/src/torchcodec/_core/CudaDeviceInterface.cpp index b484ba78..f50d2380 100644 --- a/src/torchcodec/_core/CudaDeviceInterface.cpp +++ b/src/torchcodec/_core/CudaDeviceInterface.cpp @@ -198,31 +198,36 @@ NppStreamContext* getNppStreamContextFromCache(const torch::Device& device) { torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device); std::scoped_lock lock(g_cached_npp_stream_mutexes[deviceIndex]); if (g_cached_npp_stream_ctxs[deviceIndex].size() > 0) { - NppStreamContext* hw_device_ctx = + NppStreamContext* npp_stream_ctx = g_cached_npp_stream_ctxs[deviceIndex].back(); g_cached_npp_stream_ctxs[deviceIndex].pop_back(); - return hw_device_ctx; + return npp_stream_ctx; } else { return nullptr; } } +void addNppStreamContextToCache( + const torch::Device& device, + NppStreamContext* ctx) { + torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device); + std::scoped_lock lock(g_cached_npp_stream_mutexes[deviceIndex]); + // Add to cache if cache has capacity + if (g_cached_npp_stream_ctxs[deviceIndex].size() < MAX_CUDA_GPUS) { + g_cached_npp_stream_ctxs[deviceIndex].push_back(ctx); + } +} + NppStreamContext* getNppStreamContext(const torch::Device& device) { // Return the cached NppStreamContext if it exists. NppStreamContext* cached_npp_ctx_ptr = getNppStreamContextFromCache(device); if (cached_npp_ctx_ptr != nullptr) { return cached_npp_ctx_ptr; } + // Create new NppStreamContext, and cache it NppStreamContext* npp_ctx_ptr = createNppStreamContext( static_cast(getFFMPEGCompatibleDeviceIndex(device))); - - torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device); - std::scoped_lock lock(g_cached_npp_stream_mutexes[deviceIndex]); - // Add to cache if cache has capacity - if (g_cached_npp_stream_ctxs[deviceIndex].size() < MAX_CUDA_GPUS) { - g_cached_npp_stream_ctxs[deviceIndex].push_back(npp_ctx_ptr); - } - // Error to user when full? + addNppStreamContextToCache(device, npp_ctx_ptr); return npp_ctx_ptr; } @@ -330,9 +335,6 @@ void CudaDeviceInterface::convertAVFrameToFrameOutput( dst = allocateEmptyHWCTensor(height, width, device_); } - // TODO cache the NppStreamContext! It currently gets re-recated for every - // single frame. The cache should be per-device, similar to the existing - // hw_device_ctx cache. NppStreamContext nppCtx = *getNppStreamContext(device_); torch::DeviceIndex deviceIndex = getFFMPEGCompatibleDeviceIndex(device_);