Skip to content

Commit

Permalink
[native] SystemConnector to query system.runtime.tasks table
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Apr 26, 2024
1 parent 9a20c79 commit aeaa0b7
Show file tree
Hide file tree
Showing 13 changed files with 12,840 additions and 11,779 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Expand Up @@ -29,6 +29,7 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
TaskManager.cpp
TaskResource.cpp
PeriodicHeartbeatManager.cpp
Expand Down
23 changes: 23 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Expand Up @@ -21,6 +21,7 @@
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
Expand Down Expand Up @@ -232,6 +233,17 @@ void PrestoServer::run() {
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

initializeVeloxMemory();
initializeThreadPools();
Expand Down Expand Up @@ -438,6 +450,7 @@ void PrestoServer::run() {
}
prestoServerOperations_ =
std::make_unique<PrestoServerOperations>(taskManager_.get(), this);
registerSystemConnector();

// The endpoint used by operation in production.
httpServer_->registerGet(
Expand Down Expand Up @@ -939,6 +952,15 @@ std::vector<std::string> PrestoServer::registerConnectors(
return catalogNames;
}

void PrestoServer::registerSystemConnector() {
PRESTO_STARTUP_LOG(INFO) << "Registering system catalog "
<< " using connector SystemConnector";
VELOX_CHECK(taskManager_);
auto systemConnector =
std::make_shared<SystemConnector>("$system@system", taskManager_.get());
velox::connector::registerConnector(systemConnector);
}

void PrestoServer::unregisterConnectors() {
PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors";
auto connectors = facebook::velox::connector::getAllConnectors();
Expand All @@ -959,6 +981,7 @@ void PrestoServer::unregisterConnectors() {
}
}

facebook::velox::connector::unregisterConnector("$system@system");
PRESTO_SHUTDOWN_LOG(INFO)
<< "Unregistered " << connectors.size() << " connectors";
}
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Expand Up @@ -200,6 +200,8 @@ class PrestoServer {
// Periodically yield tasks if there are tasks queued.
void yieldTasks();

void registerSystemConnector();

const std::string configDirectoryPath_;

std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Expand Up @@ -497,12 +497,15 @@ void PrestoTask::updateTimeInfoLocked(
util::toISOTimestamp(veloxTaskStats.executionStartTimeMs);
prestoTaskStats.firstStartTime =
util::toISOTimestamp(veloxTaskStats.firstSplitStartTimeMs);
createTimeMs = veloxTaskStats.executionStartTimeMs;
firstSplitStartTimeMs = veloxTaskStats.firstSplitStartTimeMs;
prestoTaskStats.lastStartTime =
util::toISOTimestamp(veloxTaskStats.lastSplitStartTimeMs);
prestoTaskStats.lastEndTime =
util::toISOTimestamp(veloxTaskStats.executionEndTimeMs);
prestoTaskStats.endTime =
util::toISOTimestamp(veloxTaskStats.executionEndTimeMs);
lastEndTimeMs = veloxTaskStats.executionEndTimeMs;

if (veloxTaskStats.executionEndTimeMs > veloxTaskStats.executionStartTimeMs) {
prestoTaskStats.elapsedTimeInNanos = (veloxTaskStats.executionEndTimeMs -
Expand Down Expand Up @@ -804,6 +807,9 @@ folly::dynamic PrestoTask::toJson() const {
obj["lastHeartbeatMs"] = lastHeartbeatMs;
obj["lastTaskStatsUpdateMs"] = lastTaskStatsUpdateMs;
obj["lastMemoryReservation"] = lastMemoryReservation;
obj["createTimeMs"] = createTimeMs;
obj["firstSplitStartTimeMs"] = firstSplitStartTimeMs;
obj["lastEndTimeMs"] = lastEndTimeMs;

json j;
to_json(j, info);
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Expand Up @@ -98,6 +98,9 @@ struct PrestoTask {
uint64_t lastHeartbeatMs{0};
uint64_t lastTaskStatsUpdateMs = {0};
uint64_t lastMemoryReservation = {0};
uint64_t createTimeMs{0};
uint64_t firstSplitStartTimeMs{0};
uint64_t lastEndTimeMs{0};
mutable std::mutex mutex;

/// Error before task is created or when task is being created.
Expand Down

0 comments on commit aeaa0b7

Please sign in to comment.