Skip to content

Tickets/dm 51299 #939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: tickets/DM-43715
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ volumes:
volume_czar_xrootd:
volume_czar_home:
volume_czar_cfg:
volume_czar_transfer:

volume_czar_mariadb_data:
volume_czar_mariadb_cfg:
Expand Down Expand Up @@ -272,6 +273,10 @@ services:
- type: volume
source: volume_czar_mariadb_run
target: /qserv/mariadb/run
- type: volume
source: volume_czar_transfer
target: /tmp

- << : *log-volume
expose:
- "3306" # for czar-mariadb
Expand Down Expand Up @@ -306,6 +311,9 @@ services:
- type: volume
source: volume_czar_cfg
target: /config-etc
- type: volume
source: volume_czar_transfer
target: /tmp
- type: volume
source: volume_czar_home
target: /home/qserv
Expand Down
17 changes: 17 additions & 0 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,29 @@ port = {{ czar_db_port }}
# Any table in resultdb that hasn't been updated in this many days is deleted.
oldestResultKeptDays = 7

# Either this should be changed to a high performance docker volume directory
# or /tmp should be mounted as a high performance docker volume directory
# to avoid using limited docker memory to store the contents.
transferDir = /tmp

# maximum number of connection retries to SQL databse (per connection attempt)
maxsqlconnectionattempts = 10

# maximum user query result size in MB
maxtablesize_mb = 5100

# maximum number of MB of concurrent csv transfer files allowed to be kept in
# memory, after this point the will be temporarily written to disk.
# 0 is used for testing. 10000 is usually reasonable.
maxTransferMemMB = 0

# minimum number of MB for each csv transfer file to be kept in memory
# before possibly going to disk.
# 0 for testing, up to 10 should be reasonable.
transferMinMBInMem = 0




# database connection for QMeta database
[qmeta]
Expand Down
15 changes: 15 additions & 0 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class CzarConfig {
/// Getters for result aggregation options.
int getMaxTableSizeMB() const { return _maxTableSizeMB->getVal(); }
int getMaxSqlConnectionAttempts() const { return _maxSqlConnectionAttempts->getVal(); }
unsigned int getMaxTransferMemMB() const { return _resultMaxTransferMemMB->getVal(); }
/// Return the transfer directory. This is customizable to allow for a
/// high performance volume.
std::string getTransferDir() const { return _resultTransferDir->getVal(); }

/// Return the minimum amount of memory per UberJob to keep in memory. This much transfer
/// data will be stored in memory regardless of other conditions.
unsigned int getTransferMinMBInMem() const { return _resultTransferMinMBInMem->getVal(); }

/// The size of the TCP connection pool within the client API that is used
/// by the merger to pool result files from workers via the HTTP protocol.
Expand Down Expand Up @@ -288,6 +296,13 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);
// This must be larger than _maxTableSizeMB when using the "memory" TransferMethod
CVTUIntPtr _resultMaxTransferMemMB =
util::ConfigValTUInt::create(_configValMap, "resultdb", "maxTransferMemMB", notReq, 10000);
CVTStrPtr _resultTransferDir =
util::ConfigValTStr::create(_configValMap, "resultdb", "transferDir", notReq, "/tmp");
CVTUIntPtr _resultTransferMinMBInMem =
util::ConfigValTUInt::create(_configValMap, "resultdb", "transferMinMBInMem", notReq, 10);

/// Get all the elements in the css section.
CVTStrPtr _cssTechnology =
Expand Down
39 changes: 15 additions & 24 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#include "http/Client.h"
#include "http/ClientConnPool.h"
#include "http/Method.h"
#include "mysql/CsvBuffer.h"
#include "mysql/CsvMemDisk.h"
#include "qdisp/CzarStats.h"
#include "qdisp/Executive.h"
#include "qdisp/JobQuery.h"
Expand Down Expand Up @@ -195,13 +195,8 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
}

if (fileSize == 0) return qdisp::MergeEndStatus(true);

// Read from the http stream and push records into the CSV stream in a separate thread.
// Note the fixed capacity of the stream which allows up to 2 records to be buffered
// in the stream. This is enough to hide the latency of the HTTP connection and
// the time needed to read the file.
auto csvStream = mysql::CsvStream::create(2);
_csvStream = csvStream;
auto csvMemDisk = mysql::CsvMemDisk::create(fileSize, uberJob->getQueryId(), uberJob->getUjId());
_csvMemDisk = csvMemDisk;

// This must be after setting _csvStream to avoid cancelFileMerge()
// race issues, and it needs to be before the thread starts.
Expand All @@ -211,46 +206,46 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
}

string fileReadErrorMsg;
thread csvThread([uberJob, csvStream, fileUrl, fileSize, &fileReadErrorMsg]() {
auto transferFunc = [&]() {
size_t bytesRead = 0;
fileReadErrorMsg = ::readHttpFileAndMerge(
uberJob, fileUrl, fileSize,
[uberJob, csvStream, fileSize, &bytesRead](char const* buf, uint32_t size) {
[&](char const* buf, uint32_t size) {
bool last = false;
if (buf == nullptr || size == 0) {
last = true;
} else {
csvStream->push(buf, size);
csvMemDisk->push(buf, size);
bytesRead += size;
last = bytesRead >= fileSize;
}
if (last) {
csvStream->push(nullptr, 0);
csvMemDisk->push(nullptr, 0);
}
},
MergingHandler::_getHttpConnPool());
// Push the stream terminator to indicate the end of the stream.
// It may be needed to unblock the table merger which may be still attempting to read
// from the CSV stream.
if (!fileReadErrorMsg.empty()) {
csvStream->push(nullptr, 0);
csvMemDisk->push(nullptr, 0);
}
});
};
csvMemDisk->transferDataFromWorker(transferFunc);

// Attempt the actual merge.
bool fileMergeSuccess = _infileMerger->mergeHttp(uberJob, fileSize, csvStream);
bool fileMergeSuccess = _infileMerger->mergeHttp(uberJob, fileSize, csvMemDisk);
if (!fileMergeSuccess) {
LOGS(_log, LOG_LVL_WARN, __func__ << " merge failed");
util::Error const& err = _infileMerger->getError();
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg(), util::ErrorCode::RESULT_IMPORT);
}
if (csvStream->getContaminated()) {
if (csvMemDisk->getContaminated()) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " merge stream contaminated");
fileMergeSuccess = false;
_setError(ccontrol::MSG_RESULT_ERROR, "merge stream contaminated", util::ErrorCode::RESULT_IMPORT);
}

csvThread.join();
if (!fileReadErrorMsg.empty()) {
LOGS(_log, LOG_LVL_WARN, __func__ << " result file read failed");
_setError(ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
Expand All @@ -261,15 +256,14 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
if (!mergeEStatus.success) {
// This error check needs to come after the csvThread.join() to ensure writing
// is finished. If any bytes were written, the result table is ruined.
mergeEStatus.contaminated = csvStream->getBytesWritten() > 0;
mergeEStatus.contaminated = csvMemDisk->getBytesFetched() > 0;
}
// TODO:UJ Make it impossible to contaminate the result table for all errors
// short of czar or mariadb crash.

return mergeEStatus;
}

void MergingHandler::cancelFileMerge() {
auto csvStrm = _csvStream.lock();
auto csvStrm = _csvMemDisk.lock();
if (csvStrm != nullptr) {
csvStrm->cancel();
}
Expand All @@ -295,9 +289,6 @@ qdisp::MergeEndStatus MergingHandler::flushHttp(string const& fileUrl, uint64_t
"MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr() << " fileUrl=" << fileUrl);

qdisp::MergeEndStatus mergeStatus = _mergeHttp(uberJob, fileUrl, fileSize);
if (mergeStatus.success) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: was this found redundant / vestigial / potentially problematic / now handled somewhere else?

Copy link
Contributor Author

@jgates108 jgates108 Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now handled directly in InfileMerger::mergeHttp as _infileMerger->mergeCompleteFor(...) just added bytes to total byte counts. Things moved around a bit, so the total can be added once the merge finishes in InfileMerger::mergeHttp

Look for _totalResultSize

_infileMerger->mergeCompleteFor(uberJob->getUjId());
}
return mergeStatus;
}

Expand Down
6 changes: 3 additions & 3 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ClientConnPool;
} // namespace lsst::qserv::http

namespace lsst::qserv::mysql {
class CsvStream;
class CsvMemDisk;
} // namespace lsst::qserv::mysql

namespace lsst::qserv::qdisp {
Expand Down Expand Up @@ -105,8 +105,8 @@ class MergingHandler : public qdisp::ResponseHandler {
bool _flushed{false}; ///< flushed to InfileMerger?
std::string _wName{"~"}; ///< worker name

std::weak_ptr<qdisp::Executive> _executive; ///< Weak pointer to the executive for errors.
std::weak_ptr<mysql::CsvStream> _csvStream; ///< Weak pointer to cancel infile merge.
std::weak_ptr<qdisp::Executive> _executive; ///< Weak pointer to the executive for errors.
std::weak_ptr<mysql::CsvMemDisk> _csvMemDisk; ///< Weak pointer to cancel infile merge.
};

} // namespace lsst::qserv::ccontrol
Expand Down
3 changes: 2 additions & 1 deletion src/czar/ActiveWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ void ActiveWorkerMap::updateMap(protojson::WorkerContactInfo::WCMap const& wcMap
auto iter = _awMap.find(wcKey);
if (iter == _awMap.end()) {
auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey);
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " ActiveWorker created for " << wcKey);
LOGS(_log, LOG_LVL_INFO,
cName(__func__) << " ActiveWorker created for " << wcKey << " " << newAW->dump());
_awMap[wcKey] = newAW;
if (_czarCancelAfterRestart) {
newAW->setCzarCancelAfterRestart(_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId);
Expand Down
10 changes: 10 additions & 0 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "http/ClientConnPool.h"
#include "http/MetaModule.h"
#include "http/Method.h"
#include "mysql/CsvMemDisk.h"
#include "qdisp/CzarStats.h"
#include "qdisp/Executive.h"
#include "qproc/DatabaseModels.h"
Expand Down Expand Up @@ -181,6 +182,15 @@ Czar::Czar(string const& configFilePath, string const& czarName)
// the name of the Czar gets translated into a numeric identifier.
_czarConfig->setId(_uqFactory->userQuerySharedResources()->qMetaCzarId);

CzarIdType czarId = _czarConfig->id();
size_t const MB_SIZE_BYTES = 1024 * 1024;
size_t maxResultTableSizeBytes = _czarConfig->getMaxTableSizeMB() * MB_SIZE_BYTES;
size_t maxMemToUse = _czarConfig->getMaxTransferMemMB() * MB_SIZE_BYTES;
string const transferDirectory = _czarConfig->getTransferDir();
std::size_t const transferMinBytesInMem = _czarConfig->getTransferMinMBInMem() * MB_SIZE_BYTES;
mysql::TransferTracker::setup(maxMemToUse, transferDirectory, transferMinBytesInMem,
maxResultTableSizeBytes, czarId);

// Tell workers to cancel any queries that were submitted before this restart of Czar.
// Figure out which query (if any) was recorded in Czar databases before the restart.
// The id will be used as the high-watermark for queries that need to be cancelled.
Expand Down
8 changes: 6 additions & 2 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ void CzarChunkMap::verify(string const& familyName) const {
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " family=" << familyName << " verified");
}

string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) {
string CzarChunkMap::dumpChunkMap() const {
stringstream os;
os << "ChunkMap{";
for (auto const& [cId, cDataPtr] : chunkMap) {
for (auto const& [cId, cDataPtr] : *_chunkMap) {
os << "(cId=" << cId << ":";
os << ((cDataPtr == nullptr) ? "null" : cDataPtr->dump()) << ")";
}
Expand Down Expand Up @@ -355,6 +355,10 @@ bool CzarFamilyMap::_read() {

verify(familyMapPtr);

for (auto const& [fam, ccMap] : *familyMapPtr) {
LOGS(_log, LOG_LVL_INFO, "{family=" << fam << "{" << ccMap->dumpChunkMap() << "}}");
}

_familyMap = familyMapPtr;

_lastUpdateTime = qChunkMap.updateTime;
Expand Down
2 changes: 1 addition & 1 deletion src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class CzarChunkMap {
/// @throws ChunkMapException
void verify(std::string const& familyName) const;

static std::string dumpChunkMap(ChunkMap const& chunkMap);
std::string dumpChunkMap() const;

static std::string dumpWorkerChunkMap(WorkerChunkMap const& wcMap);

Expand Down
3 changes: 1 addition & 2 deletions src/czar/CzarRegistry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ void CzarRegistry::_registryUpdateLoop() {
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
vector<string> const headers = {"Content-Type: application/json"};
string const fqdn = util::getCurrentHostFqdnBlocking();
json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()},
{"auth_key", _czarConfig->replicationAuthKey()},
{"czar",
{{"name", _czarConfig->name()},
{"id", _czarConfig->id()},
{"management-port", _czarConfig->replicationHttpPort()},
{"management-host-name", fqdn}}}});
{"management-host-name", util::getCurrentHostFqdnBlocking()}}}});
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_TRACE,
__func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]);
Expand Down
1 change: 1 addition & 0 deletions src/mysql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_library(mysql SHARED)

target_sources(mysql PRIVATE
CsvBuffer.cc
CsvMemDisk.cc
LocalInfile.cc
MySqlConfig.cc
MySqlConnection.cc
Expand Down
Loading