diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index e7e4baada203c..080853598e2a9 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -805,18 +805,21 @@ void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot) /// This method also clears the contents of GetCodeToJit(). void RLoopManager::Jit() { - { - R__READ_LOCKGUARD(ROOT::gCoreMutex); - if (GetCodeToJit().empty()) { - R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; - return; - } + R__READ_LOCKGUARD(ROOT::gCoreMutex); + if (GetCodeToJit().empty()) { + R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; + return; } - const std::string code = []() { - R__WRITE_LOCKGUARD(ROOT::gCoreMutex); - return std::move(GetCodeToJit()); - }(); + R__WRITE_LOCKGUARD(ROOT::gCoreMutex); + // Check again if another thread has already cleared the global string + // with the code to JIT. Without this check, we could end up calling + // InterpreterCalc with an empty string, which would raise an exception. + if (GetCodeToJit().empty()) { + R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute."; + return; + } + const std::string code = std::move(GetCodeToJit()); TStopwatch s; s.Start(); diff --git a/tree/dataframe/test/dataframe_concurrency.cxx b/tree/dataframe/test/dataframe_concurrency.cxx index 3f60ae76186ab..64fdaeeab4c9d 100644 --- a/tree/dataframe/test/dataframe_concurrency.cxx +++ b/tree/dataframe/test/dataframe_concurrency.cxx @@ -3,9 +3,12 @@ #include #include #include +#include +#include +#include +#include #include #include -#include #include "gtest/gtest.h" @@ -15,6 +18,15 @@ static const auto NUM_THREADS = 8u; static const auto NUM_THREADS = 0u; #endif +template +void expect_vec_eq(const std::vector &v1, const std::vector &v2) +{ + ASSERT_EQ(v1.size(), v2.size()) << "Vectors 'v1' and 'v2' are of unequal length"; + for (decltype(v1.size()) i{}; i < v1.size(); ++i) { + EXPECT_EQ(v1[i], v2[i]) << "Vectors 'v1' and 'v2' differ at index " << i; + } +} + #ifdef R__USE_IMT TEST(RDFConcurrency, NestedParallelismBetweenDefineCalls) { @@ -234,5 +246,112 @@ TEST(RDFConcurrency, ParallelRDFCachesEnableImplicitMT) ParallelRDFCaches(); ROOT::DisableImplicitMT(); } - #endif + +// Check that multiple RDF can JIT at the same time without interfering +// with each other +TEST(RDFConcurrency, JITWithManyThreads) +{ + ROOT::EnableThreadSafety(); + + std::vector expected(25); + std::iota(expected.begin(), expected.end(), 0); + + std::vector results(25); + auto do_work = [&results](int slot) { + const int begin{slot * 5}; + const int end{begin + 5}; + for (int i = begin; i < end; i++) { + results[i] = ROOT::RDataFrame{1}.Define("x", std::to_string(i)).Sum("x").GetValue(); + } + }; + + std::vector threads; + threads.reserve(5); + for (int slot = 0; slot < 5; slot++) + threads.emplace_back(do_work, slot); + + for (auto &&t : threads) + t.join(); + + expect_vec_eq(results, expected); +} + +TEST(RDFConcurrency, JITManyThreadsAndExceptions) +{ + ROOT::EnableThreadSafety(); + + std::condition_variable cv_1; + std::condition_variable cv_2; + std::condition_variable cv_3; + std::mutex m_1; + std::mutex m_2; + std::mutex m_3; + bool ready_1{false}; + bool ready_2{false}; + bool ready_3{false}; + + int res_work_2{}; + int res_work_3{}; + + auto work_1 = [&]() { + std::lock_guard lk_1{m_1}; + try { + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_1_x", "throw std::runtime_error(\"Error in RDF!\"); return 42;"); + df1.Sum("work_1_x").GetValue(); + } catch (...) { + } + ready_1 = true; + cv_1.notify_all(); + }; + + auto work_2 = [&]() { + std::unique_lock lk_1(m_1); + std::lock_guard lk_2{m_2}; + std::unique_lock lk_3{m_3}; + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_2_x", "42"); + cv_1.wait(lk_1, [&ready_1] { return ready_1; }); + auto df2 = df1.Define("work_2_y", "58"); + cv_3.wait(lk_3, [&ready_3] { return ready_3; }); + auto df3 = df2.Define("work_2_z", "work_2_x + work_2_y"); + res_work_2 = df3.Sum("work_2_z").GetValue(); + ready_2 = true; + cv_2.notify_one(); + }; + + auto work_3 = [&]() { + std::unique_lock lk_1(m_1); + std::unique_lock lk_2(m_2); + std::unique_lock lk_3(m_3); + ROOT::RDataFrame df{1}; + auto df1 = df.Define("work_3_x", "11"); + auto df2 = df1.Define("work_3_y", "work_3_x * 2"); + cv_1.wait(lk_1, [&ready_1] { return ready_1; }); + cv_2.wait(lk_2, [&ready_2] { return ready_2; }); + auto df3 = df2.Define("work_3_z", "work_3_y + work_3_x"); + cv_3.wait(lk_3, [&ready_3] { return ready_3; }); + auto df4 = df3.Define("work_3_fin", "work_3_x + work_3_y + work_3_z"); + res_work_3 = df4.Sum("work_3_fin").GetValue(); + }; + + auto work_4 = [&]() { + std::lock_guard lk_3{m_3}; + try { + ROOT::RDataFrame df{1}; + auto df1 = df.Define("x", "rndm"); + } catch (...) { + } + ready_3 = true; + cv_3.notify_all(); + }; + + std::array threads{std::thread{work_1}, std::thread{work_2}, std::thread{work_3}, + std::thread{work_4}}; + for (auto &&t : threads) + t.join(); + + EXPECT_EQ(res_work_2, 100); + EXPECT_EQ(res_work_3, 66); +}