From 453bd96129e2dad8b2136384b7c215bcc457ef8b Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Tue, 21 May 2024 07:48:57 +0200 Subject: [PATCH 1/3] [df] Add test for JIT with explicit multithreading Test a program with multiple threads managed explicitly by the user and each thread runs an RDataFrame with JITting. In particular, this test probes the following situation: * Threads start building the computation graph, code to JIT goes in a global accumulating string. * Thread 1 reaches the RLoopManager::Jit method. Checks the global string, it is not empty. * Thread 2 arrives as well, checks the string, it is not empty. * Thread 1 now takes a write lock on the string, moving it into the local function stack, thus leaving it empty. * Thread 2 wants to move the string as well, but just moves an empty string. * Thread 2 is faster than Thread 1 at passing its (empty) string to the `InterpreterCalc` function. * The `InterpreterCalc` function receives an empty string, thus will raise an exception since `gInterpreter->Calc("")` returns a `kDangerous` interpreter error code. --- tree/dataframe/test/dataframe_concurrency.cxx | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/tree/dataframe/test/dataframe_concurrency.cxx b/tree/dataframe/test/dataframe_concurrency.cxx index 3f60ae76186ab..b31ab84da6f4a 100644 --- a/tree/dataframe/test/dataframe_concurrency.cxx +++ b/tree/dataframe/test/dataframe_concurrency.cxx @@ -3,9 +3,10 @@ #include #include #include +#include +#include #include #include -#include #include "gtest/gtest.h" @@ -15,6 +16,15 @@ static const auto NUM_THREADS = 8u; static const auto NUM_THREADS = 0u; #endif +template +void expect_vec_eq(const std::vector &v1, const std::vector &v2) +{ + ASSERT_EQ(v1.size(), v2.size()) << "Vectors 'v1' and 'v2' are of unequal length"; + for (decltype(v1.size()) i{}; i < v1.size(); ++i) { + EXPECT_EQ(v1[i], v2[i]) << "Vectors 'v1' and 'v2' differ at index " << i; + } +} + #ifdef R__USE_IMT TEST(RDFConcurrency, NestedParallelismBetweenDefineCalls) { @@ -234,5 +244,33 @@ TEST(RDFConcurrency, ParallelRDFCachesEnableImplicitMT) ParallelRDFCaches(); ROOT::DisableImplicitMT(); } - #endif + +// Check that multiple RDF can JIT at the same time without interfering +// with each other +TEST(RDFConcurrency, JITWithManyThreads) +{ + ROOT::EnableThreadSafety(); + + std::vector expected(25); + std::iota(expected.begin(), expected.end(), 0); + + std::vector results(25); + auto do_work = [&results](int slot) { + const int begin{slot * 5}; + const int end{begin + 5}; + for (int i = begin; i < end; i++) { + results[i] = ROOT::RDataFrame{1}.Define("x", std::to_string(i)).Sum("x").GetValue(); + } + }; + + std::vector threads; + threads.reserve(5); + for (int slot = 0; slot < 5; slot++) + threads.emplace_back(do_work, slot); + + for (auto &&t : threads) + t.join(); + + expect_vec_eq(results, expected); +} From b838ea5662f8669fd6c129444f27ef2e9c87e683 Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Tue, 21 May 2024 10:43:59 +0200 Subject: [PATCH 2/3] [df] Ensure there is code to JIT before calling the interpreter In an explicit multithreading scenario inside the `RLoopManager::Jit` method, a thread might move the global string with the code to JIT into its own function stack while another thread might still believe that string is not empty. This would make the other thread call `InterpreterCalc` with an empty string which would eventually lead to an exception. Check for the string emptiness twice, once at the beginning of the function, then again after taking the write lock so that if another thread already moved the string then the current thread will not continue with JITting. --- tree/dataframe/src/RLoopManager.cxx | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index e7e4baada203c..080853598e2a9 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -805,18 +805,21 @@ void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot) /// This method also clears the contents of GetCodeToJit(). void RLoopManager::Jit() { - { - R__READ_LOCKGUARD(ROOT::gCoreMutex); - if (GetCodeToJit().empty()) { - R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; - return; - } + R__READ_LOCKGUARD(ROOT::gCoreMutex); + if (GetCodeToJit().empty()) { + R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; + return; } - const std::string code = []() { - R__WRITE_LOCKGUARD(ROOT::gCoreMutex); - return std::move(GetCodeToJit()); - }(); + R__WRITE_LOCKGUARD(ROOT::gCoreMutex); + // Check again if another thread has already cleared the global string + // with the code to JIT. Without this check, we could end up calling + // InterpreterCalc with an empty string, which would raise an exception. + if (GetCodeToJit().empty()) { + R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; + return; + } + const std::string code = std::move(GetCodeToJit()); TStopwatch s; s.Start(); From 0662195458da270612da70e2018df5addcad7cd7 Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Wed, 29 May 2024 14:45:31 +0200 Subject: [PATCH 3/3] [df] Add another test for RDF and explicit multithreading For the following scenario: * Multiple threads, multiple different RDF computation graphs (one per thread). * Some threads run a valid, working computation graph, others have some problems. * Two classes of problems are used. The first is when the declaration of some code to JIT incurs in an error in the interpreter. The second is when there is an exception at runtime. * All problems are hidden explicitly by a try-catch scope. * Threads running a working computation graph should not be impacted by the threads that are running a flawed computation graph. --- tree/dataframe/test/dataframe_concurrency.cxx | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tree/dataframe/test/dataframe_concurrency.cxx b/tree/dataframe/test/dataframe_concurrency.cxx index b31ab84da6f4a..64fdaeeab4c9d 100644 --- a/tree/dataframe/test/dataframe_concurrency.cxx +++ b/tree/dataframe/test/dataframe_concurrency.cxx @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -274,3 +276,82 @@ TEST(RDFConcurrency, JITWithManyThreads) expect_vec_eq(results, expected); } + +TEST(RDFConcurrency, JITManyThreadsAndExceptions) +{ + ROOT::EnableThreadSafety(); + + std::condition_variable cv_1; + std::condition_variable cv_2; + std::condition_variable cv_3; + std::mutex m_1; + std::mutex m_2; + std::mutex m_3; + bool ready_1{false}; + bool ready_2{false}; + bool ready_3{false}; + + int res_work_2{}; + int res_work_3{}; + + auto work_1 = [&]() { + std::lock_guard lk_1{m_1}; + try { + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_1_x", "throw std::runtime_error(\"Error in RDF!\"); return 42;"); + df1.Sum("work_1_x").GetValue(); + } catch (...) { + } + ready_1 = true; + cv_1.notify_all(); + }; + + auto work_2 = [&]() { + std::unique_lock lk_1(m_1); + std::lock_guard lk_2{m_2}; + std::unique_lock lk_3{m_3}; + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_2_x", "42"); + cv_1.wait(lk_1, [&ready_1] { return ready_1; }); + auto df2 = df1.Define("work_2_y", "58"); + cv_3.wait(lk_3, [&ready_3] { return ready_3; }); + auto df3 = df2.Define("work_2_z", "work_2_x + work_2_y"); + res_work_2 = df3.Sum("work_2_z").GetValue(); + ready_2 = true; + cv_2.notify_one(); + }; + + auto work_3 = [&]() { + std::unique_lock lk_1(m_1); + std::unique_lock lk_2(m_2); + std::unique_lock lk_3(m_3); + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_3_x", "11"); + auto df2 = df1.Define("work_3_y", "work_3_x * 2"); + cv_1.wait(lk_1, [&ready_1] { return ready_1; }); + cv_2.wait(lk_2, [&ready_2] { return ready_2; }); + auto df3 = df2.Define("work_3_z", "work_3_y + work_3_x"); + cv_3.wait(lk_3, [&ready_3] { return ready_3; }); + auto df4 = df3.Define("work_3_fin", "work_3_x + work_3_y + work_3_z"); + res_work_3 = df4.Sum("work_3_fin").GetValue(); + }; + + auto work_4 = [&]() { + std::lock_guard lk_3{m_3}; + try { + ROOT::RDataFrame df{1}; + auto df1 = df.Define("x", "rndm"); + } catch (...) { + } + ready_3 = true; + cv_3.notify_all(); + }; + + std::array threads{std::thread{work_1}, std::thread{work_2}, std::thread{work_3}, + std::thread{work_4}}; + for (auto &&t : threads) + t.join(); + + EXPECT_EQ(res_work_2, 100); + EXPECT_EQ(res_work_3, 66); +}