Skip to content

[WIP] graph pool #3114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ci/build_test_OnCommit.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pipeline {
timeout(time: 120, unit: 'MINUTES')
}
parallel {
stage("Run unit tests") {
/*stage("Run unit tests") {
agent {
label "${agent_name_linux}"
}
Expand All @@ -144,7 +144,7 @@ pipeline {
}
}
}
}
}*/
stage("Internal tests") {
agent {
label "${agent_name_linux}"
Expand All @@ -164,7 +164,7 @@ pipeline {
}
}
}
stage('Test windows') {
/*stage('Test windows') {
agent {
label "${agent_name_windows}"
}
Expand All @@ -188,7 +188,7 @@ pipeline {
}
}
}
}
}*/
}
}
}
Expand Down
36 changes: 31 additions & 5 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@ cc_library(
copts = [],
visibility = ["//visibility:public",],
linkopts = [],
local_defines = COMMON_LOCAL_DEFINES,
)
cc_library(
name = "mediapipe_internal_graphqueue",
hdrs = [
"mediapipe_internal/graphqueue.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
], # TODO FIXME
srcs = ["mediapipe_internal/graphqueue.cpp"],
deps = [
"libovms_queue",
"libovmslogging",
"execution_context",
"libovmstimer",
"libovmsmetrics",
"model_metric_reporter",
"//third_party:openvino",
"@mediapipe//mediapipe/framework:calculator_graph",
"//src/python:libovmspythonmodule", # TODO not splitted
"//src/llm:genai_servables", # TODO split!
],
copts = [],
visibility = ["//visibility:public",],
linkopts = [],
)
cc_library(
name = "libovms_ovinferrequestsqueue",
Expand Down Expand Up @@ -379,7 +403,7 @@ cc_library(
srcs = ["model_metric_reporter.cpp"],
deps = [
"libovmsmodelversion",
"libovms_execution_context",
"execution_context",
"libovmslogging",
"libovmsmetrics",
],
Expand Down Expand Up @@ -589,6 +613,7 @@ cc_library(
"mediapipe_internal/mediapipegraphconfig.cpp",
"mediapipe_internal/mediapipegraphdefinition.cpp",
"mediapipe_internal/mediapipegraphdefinition.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
"mediapipe_internal/mediapipegraphexecutor.cpp",
"mediapipe_internal/mediapipegraphexecutor.hpp",
"mediapipe_internal/packettypes.hpp",
Expand Down Expand Up @@ -649,7 +674,7 @@ cc_library(
"libovms_dags_nodesessionresult",
"libovms_dags_nodeinputhandler",
"custom_node_output_allocator",
"libovms_execution_context",
"execution_context",
"executingstreamidguard",
"libovmsfilesystem",
"libovmsfilesystemfactory_h",
Expand Down Expand Up @@ -714,6 +739,7 @@ cc_library(
})
+ select({
"//conditions:default": [
"mediapipe_internal_graphqueue",
"@mediapipe_calculators//:mediapipe_calculators", # Need this dependencies here because we use ovms/src - cannot add in ovms_dependencies because we copy src directory later in Dockerfile
"@mediapipe//mediapipe/graphs/holistic_tracking:holistic_tracking_to_render_data",
"@mediapipe//mediapipe/graphs/iris_tracking:iris_tracking_cpu_deps",
Expand Down Expand Up @@ -1863,7 +1889,7 @@ cc_library(
],
deps = [
"libovmslogging",
"libovms_execution_context",
"execution_context",
"libovms_dags_session_id",
],
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -2273,7 +2299,7 @@ cc_library(
linkopts = LINKOPTS_ADJUSTED,
)
cc_library(
name = "libovms_execution_context",
name = "execution_context",
hdrs = ["execution_context.hpp",],
deps = [],
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -2370,7 +2396,7 @@ cc_library(
deps = [
"libovmslogging",
"libovmsmodelversionstatus",
"libovms_execution_context",
"execution_context",
"libovms_dags_aliases",
"libovms_dags_pipelineeventqueue",
"libovms_dags_session_id",
Expand Down
5 changes: 1 addition & 4 deletions src/embeddings/embeddings_calculator.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//*****************************************************************************
// Copyright 2024 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -54,8 +53,6 @@ class EmbeddingsCalculator : public CalculatorBase {
static const std::string EMBEDDINGS_MODEL_ATTENTION_MASK_NAME;
static const std::string EMBEDDINGS_MODEL_TOKEN_TYPE_IDS_NAME;

mediapipe::Timestamp timestamp{0};

protected:
std::shared_ptr<::InferenceAdapter> tokenizer_session{nullptr};
std::shared_ptr<::InferenceAdapter> embeddings_session{nullptr};
Expand Down Expand Up @@ -290,7 +287,7 @@ class EmbeddingsCalculator : public CalculatorBase {
}
time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - parseResponseStartTime).count();
SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Embeddings response deserialization time: {} ms", time / 1000);
cc->Outputs().Tag(OUTPUT_TAG_NAME).Add(new std::string(buffer.GetString()), timestamp);
cc->Outputs().Tag(OUTPUT_TAG_NAME).Add(new std::string(buffer.GetString()), cc->InputTimestamp());
return absl::OkStatus();
}
};
Expand Down
3 changes: 3 additions & 0 deletions src/http_frontend/http_graph_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ Status createAndPushPacketsImpl(
"failed to deserialize",
StatusCode::MEDIAPIPE_GRAPH_ADD_PACKET_INPUT_STREAM);
numberOfPacketsCreated = 1;
// TODO FIXME @atobisze properly implement on all exit paths
auto now = std::chrono::system_clock::now();
currentTimestamp = ::mediapipe::Timestamp(std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count());
return StatusCode::OK;
}

Expand Down
3 changes: 3 additions & 0 deletions src/kfs_frontend/kfs_graph_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ static Status createPacketAndPushIntoGraph(const std::string& name, std::shared_
}
std::unique_ptr<T> inputTensor;
OVMS_RETURN_ON_FAIL(deserializeTensor(name, *request, inputTensor, pythonBackend));
SPDLOG_ERROR("Current Timestamp before actual pushing:{}", timestamp.Value());
MP_RETURN_ON_FAIL(graph.AddPacketToInputStream(
name,
::mediapipe::packet_internal::Create(
Expand Down Expand Up @@ -1040,8 +1041,10 @@ static Status deserializeTimestampIfAvailable(
return status;
}
} else {
SPDLOG_ERROR("Current Timestamp before setting:{}", timestamp.Value());
auto now = std::chrono::system_clock::now();
timestamp = ::mediapipe::Timestamp(std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count());
SPDLOG_ERROR("Current Timestamp setting:{}", timestamp.Value());
}
return StatusCode::OK;
}
Expand Down
3 changes: 2 additions & 1 deletion src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ std::shared_ptr<spdlog::logger> rerank_calculator_logger = std::make_shared<spdl
#if (OV_TRACE == 1)
std::shared_ptr<spdlog::logger> ov_logger = std::make_shared<spdlog::logger>("openvino");
#endif
const std::string default_pattern = "[%Y-%m-%d %T.%e][%t][%n][%l][%s:%#] %v";
//const std::string default_pattern = "[%i] [%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
const std::string default_pattern = "[%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";

static void set_log_level(const std::string log_level, std::shared_ptr<spdlog::logger> logger) {
logger->set_level(spdlog::level::info);
Expand Down
169 changes: 169 additions & 0 deletions src/mediapipe_internal/graphqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include "graphqueue.hpp"

#include <atomic>
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <utility>
#include <vector>

#include "../queue.hpp"
#include "src/python/pythonnoderesources.hpp"
#include "src/llm/servable.hpp"

#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/status.h"

#include "outputstreamobserver.hpp"
namespace {
//const ::mediapipe::Timestamp STARTING_TIMESTAMP = ::mediapipe::Timestamp(0); // TODO @atobisze common
const std::string PYTHON_SESSION_SIDE_PACKET_NAME = "py";
const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
} // namespace
namespace ovms {

std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorGraphConfig& config, PythonNodeResourcesMap& pythonNodeResourcesMap, GenAiServableMap& genAiServableMap) {
auto gh = std::make_shared<GraphHelper>();
SPDLOG_ERROR("ER GraphHelper():{}", (void*)gh.get());
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
gh->currentTimestamp = ::mediapipe::Timestamp(0);

auto absStatus = gh->graph->Initialize(config);
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
throw 42;
}
for (auto& name : config.output_stream()) {
std::string streamName = getStreamName(name);
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
throw 42;
}
}
std::map<std::string, mediapipe::Packet> inputSidePackets;
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(pythonNodeResourcesMap)
.At(STARTING_TIMESTAMP);
inputSidePackets[LLM_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(genAiServableMap).At(STARTING_TIMESTAMP);
for (auto [k, v] : inputSidePackets) {
SPDLOG_ERROR("k:{} v", k);
}
SPDLOG_ERROR("ER");
absStatus = gh->graph->StartRun(inputSidePackets);
SPDLOG_ERROR("ER");
if (!absStatus.ok()) {
SPDLOG_ERROR("Input sidePackets size:{}, python map size:{} key:{} side packet name:{}", inputSidePackets.size(), pythonNodeResourcesMap.size(), pythonNodeResourcesMap.begin()->first, PYTHON_SESSION_SIDE_PACKET_NAME);
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
throw 42;
}
SPDLOG_ERROR("ER");
return gh;
}
void GraphQueue::restoreStream(int streamId) {
if (streamId < inferRequests.size()) {
SPDLOG_ERROR("Cannot restore stream id > queue length");
assert(streamId < inferRequests.size());
}
inferRequests[streamId] = constructGraphHelper(*this->config, *this->pythonNodeResourcesMap, *this->genAiServableMap);
}

GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength) :
Queue(streamsLength),
config(std::make_shared<const ::mediapipe::CalculatorGraphConfig>(config)),
pythonNodeResourcesMap(pythonNodeResourcesMap),
genAiServableMap(genAiServableMap) {
SPDLOG_ERROR("ER GraphQueue():{}", (void*)this);
inferRequests.reserve(streamsLength);
// TODO FIXME split constructor to init to handle retCodes?
for (auto i = 0; i < streamsLength; ++i) {
SPDLOG_ERROR("ER");
inferRequests.emplace_back(std::move(constructGraphHelper(*this->config, *pythonNodeResourcesMap, *genAiServableMap)));
SPDLOG_ERROR("ER");
}
}

GraphHelper::~GraphHelper() {
SPDLOG_TRACE("GraphHelper wait until idle graph");
auto absStatus = this->graph->WaitUntilIdle();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
absStatus = this->graph->CloseAllPacketSources();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw "as";
}
SPDLOG_TRACE("GraphQueue wait until done graph");
absStatus = this->graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
this->graph->Cancel();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
SPDLOG_ERROR("ER");
this->graph.reset();
SPDLOG_ERROR("ER ~GraphHelper:{}", (void*) this);
}

GraphQueue::~GraphQueue() {
SPDLOG_ERROR("ER ~GraphQueue:{}", (void*)this);
for (auto& graphHelper : inferRequests) {
SPDLOG_TRACE("GraphQueue wait until idle graph");
graphHelper.reset();
SPDLOG_ERROR("ER");
continue;
auto absStatus = graphHelper->graph->WaitUntilIdle();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
absStatus = graphHelper->graph->CloseAllPacketSources();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw "as";
}
SPDLOG_TRACE("GraphQueue wait until done graph");
absStatus = graphHelper->graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
graphHelper->graph->Cancel();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
SPDLOG_ERROR("ER");
graphHelper->graph.reset();
SPDLOG_ERROR("ER");
}
SPDLOG_ERROR("ER ~GraphQueue:{}", (void*)this);
}
// TODO FIXME @atobisze move to destructor
} // namespace ovms
Loading