Skip to content

Commit aa617c3

Browse files
committed
POC part 2
1 parent 7e33143 commit aa617c3

18 files changed

+503
-111
lines changed

ci/build_test_OnCommit.groovy

+4-4
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ pipeline {
116116
stage("Release image and tests in parallel") {
117117
when { expression { image_build_needed == "true" } }
118118
parallel {
119-
stage("Run unit tests") {
119+
/*stage("Run unit tests") {
120120
agent {
121121
label "${agent_name_linux}"
122122
}
@@ -132,7 +132,7 @@ pipeline {
132132
}
133133
}
134134
}
135-
}
135+
}*/
136136
stage("Internal tests") {
137137
agent {
138138
label "${agent_name_linux}"
@@ -152,7 +152,7 @@ pipeline {
152152
}
153153
}
154154
}
155-
stage('Test windows') {
155+
/*stage('Test windows') {
156156
agent {
157157
label "${agent_name_windows}"
158158
}
@@ -176,7 +176,7 @@ pipeline {
176176
}
177177
}
178178
}
179-
}
179+
}*/
180180
}
181181
}
182182
}

src/BUILD

+16-7
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,22 @@ cc_library(
157157
)
158158
cc_library(
159159
name = "mediapipe_internal_graphqueue",
160-
hdrs = ["mediapipe_internal/graphqueue.hpp"],
161-
# srcs = ["mediapipe_internal/graphqueue.cpp"],
160+
hdrs = [
161+
"mediapipe_internal/graphqueue.hpp",
162+
"mediapipe_internal/outputstreamobserver.hpp",
163+
], # TODO FIXME
164+
srcs = ["mediapipe_internal/graphqueue.cpp"],
162165
deps = [
163166
"libovms_queue",
164167
"libovmslogging",
168+
"execution_context",
169+
"libovmstimer",
170+
"libovmsmetrics",
171+
"model_metric_reporter",
165172
"//third_party:openvino",
166173
"@mediapipe//mediapipe/framework:calculator_graph",
174+
"//src/python:libovmspythonmodule", # TODO not splitted
175+
"//src/llm:genai_servables", # TODO split!
167176
],
168177
copts = [],
169178
visibility = ["//visibility:public",],
@@ -381,7 +390,7 @@ cc_library(
381390
srcs = ["model_metric_reporter.cpp"],
382391
deps = [
383392
"libovmsmodelversion",
384-
"libovms_execution_context",
393+
"execution_context",
385394
"libovmslogging",
386395
"libovmsmetrics",
387396
],
@@ -652,7 +661,7 @@ cc_library(
652661
"libovms_dags_nodesessionresult",
653662
"libovms_dags_nodeinputhandler",
654663
"custom_node_output_allocator",
655-
"libovms_execution_context",
664+
"execution_context",
656665
"executingstreamidguard",
657666
"libovmsfilesystem",
658667
"libovmsfilesystemfactory_h",
@@ -1866,7 +1875,7 @@ cc_library(
18661875
],
18671876
deps = [
18681877
"libovmslogging",
1869-
"libovms_execution_context",
1878+
"execution_context",
18701879
"libovms_dags_session_id",
18711880
],
18721881
visibility = ["//visibility:public"],
@@ -2276,7 +2285,7 @@ cc_library(
22762285
linkopts = LINKOPTS_ADJUSTED,
22772286
)
22782287
cc_library(
2279-
name = "libovms_execution_context",
2288+
name = "execution_context",
22802289
hdrs = ["execution_context.hpp",],
22812290
deps = [],
22822291
visibility = ["//visibility:public"],
@@ -2373,7 +2382,7 @@ cc_library(
23732382
deps = [
23742383
"libovmslogging",
23752384
"libovmsmodelversionstatus",
2376-
"libovms_execution_context",
2385+
"execution_context",
23772386
"libovms_dags_aliases",
23782387
"libovms_dags_pipelineeventqueue",
23792388
"libovms_dags_session_id",

src/kfs_frontend/kfs_graph_executor_impl.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ static Status createPacketAndPushIntoGraph(const std::string& name, std::shared_
925925
}
926926
std::unique_ptr<T> inputTensor;
927927
OVMS_RETURN_ON_FAIL(deserializeTensor(name, *request, inputTensor, pythonBackend));
928+
SPDLOG_ERROR("Current Timestamp before actual pushing:{}", timestamp.Value());
928929
MP_RETURN_ON_FAIL(graph.AddPacketToInputStream(
929930
name,
930931
::mediapipe::packet_internal::Create(
@@ -1040,8 +1041,10 @@ static Status deserializeTimestampIfAvailable(
10401041
return status;
10411042
}
10421043
} else {
1044+
SPDLOG_ERROR("Current Timestamp before setting:{}", timestamp.Value());
10431045
auto now = std::chrono::system_clock::now();
10441046
timestamp = ::mediapipe::Timestamp(std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count());
1047+
SPDLOG_ERROR("Current Timestamp setting:{}", timestamp.Value());
10451048
}
10461049
return StatusCode::OK;
10471050
}

src/logging.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ std::shared_ptr<spdlog::logger> rerank_calculator_logger = std::make_shared<spdl
3939
#if (OV_TRACE == 1)
4040
std::shared_ptr<spdlog::logger> ov_logger = std::make_shared<spdlog::logger>("openvino");
4141
#endif
42-
const std::string default_pattern = "[%i] [%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
42+
//const std::string default_pattern = "[%i] [%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
43+
const std::string default_pattern = "[%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
4344

4445
static void set_log_level(const std::string log_level, std::shared_ptr<spdlog::logger> logger) {
4546
logger->set_level(spdlog::level::info);

src/mediapipe_internal/graphqueue.cpp

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//*****************************************************************************
2+
// Copyright 2025 Intel Corporation
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//*****************************************************************************
16+
#include "graphqueue.hpp"
17+
18+
#include <atomic>
19+
#include <condition_variable>
20+
#include <future>
21+
#include <memory>
22+
#include <mutex>
23+
#include <optional>
24+
#include <queue>
25+
#include <thread>
26+
#include <utility>
27+
#include <vector>
28+
29+
#include "../queue.hpp"
30+
#include "src/python/pythonnoderesources.hpp"
31+
#include "src/llm/servable.hpp"
32+
33+
#include "mediapipe/framework/calculator_graph.h"
34+
#include "mediapipe/framework/port/status.h"
35+
36+
#include "outputstreamobserver.hpp"
37+
namespace {
38+
//const ::mediapipe::Timestamp STARTING_TIMESTAMP = ::mediapipe::Timestamp(0); // TODO @atobisze common
39+
const std::string PYTHON_SESSION_SIDE_PACKET_NAME = "py";
40+
const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
41+
} // namespace
42+
namespace ovms {
43+
GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength) :
44+
Queue(streamsLength),
45+
pythonNodeResourcesMap(pythonNodeResourcesMap),
46+
genAiServableMap(genAiServableMap) {
47+
SPDLOG_ERROR("ER Constr graph queue:{}", (void*)this);
48+
inferRequests.reserve(streamsLength);
49+
// TODO FIXME split constructor to init to handle retCodes?
50+
for (auto i = 0; i < streamsLength; ++i) {
51+
auto gh = std::make_shared<GraphHelper>();
52+
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
53+
gh->currentTimestamp = ::mediapipe::Timestamp(0);
54+
55+
auto absStatus = gh->graph->Initialize(config);
56+
if (!absStatus.ok()) {
57+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
58+
throw 42;
59+
}
60+
for (auto& name : config.output_stream()) {
61+
std::string streamName = getStreamName(name);
62+
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
63+
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
64+
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
65+
if (!absStatus.ok()) {
66+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
67+
throw 42;
68+
}
69+
}
70+
std::map<std::string, mediapipe::Packet> inputSidePackets;
71+
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(*pythonNodeResourcesMap)
72+
.At(STARTING_TIMESTAMP);
73+
inputSidePackets[LLM_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(*genAiServableMap).At(STARTING_TIMESTAMP);
74+
for (auto [k, v] : inputSidePackets) {
75+
SPDLOG_ERROR("k:{} v", k);
76+
}
77+
SPDLOG_ERROR("ER");
78+
absStatus = gh->graph->StartRun(inputSidePackets);
79+
SPDLOG_ERROR("ER");
80+
if (!absStatus.ok()) {
81+
SPDLOG_ERROR("Input sidePackets size:{}, python map size:{} key:{} side packet name:{}", inputSidePackets.size(), pythonNodeResourcesMap->size(), pythonNodeResourcesMap->begin()->first, PYTHON_SESSION_SIDE_PACKET_NAME);
82+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
83+
throw 42;
84+
}
85+
86+
SPDLOG_ERROR("ER");
87+
inferRequests.emplace_back(std::move(gh));
88+
SPDLOG_ERROR("ER");
89+
}
90+
}
91+
GraphQueue::~GraphQueue() {
92+
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
93+
for (auto& graphHelper : inferRequests) {
94+
auto absStatus = graphHelper->graph->WaitUntilIdle();
95+
if (!absStatus.ok()) {
96+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
97+
// throw 42.2;
98+
}
99+
absStatus = graphHelper->graph->CloseAllPacketSources();
100+
if (!absStatus.ok()) {
101+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
102+
// throw "as";
103+
}
104+
absStatus = graphHelper->graph->WaitUntilDone();
105+
if (!absStatus.ok()) {
106+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
107+
// throw 42.2;
108+
}
109+
graphHelper->graph->Cancel();
110+
if (!absStatus.ok()) {
111+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
112+
// throw 42.2;
113+
}
114+
SPDLOG_ERROR("ER");
115+
graphHelper->graph.reset();
116+
SPDLOG_ERROR("ER");
117+
}
118+
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
119+
}
120+
} // namespace ovms

src/mediapipe_internal/graphqueue.hpp

+36-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//*****************************************************************************
2-
// Copyright 2021 Intel Corporation
2+
// Copyright 2025 Intel Corporation
33
//
44
// Licensed under the Apache License, Version 2.0 (the "License");
55
// you may not use this file except in compliance with the License.
@@ -27,35 +27,55 @@
2727
#include <vector>
2828

2929
#include "../queue.hpp"
30+
#include "src/python/pythonnoderesources.hpp"
31+
#include "src/llm/servable.hpp"
3032

3133
#include "mediapipe/framework/calculator_graph.h"
3234
#include "mediapipe/framework/port/status.h"
35+
36+
#include "outputstreamobserver.hpp"
37+
// TODO FIXME HEADERS
38+
const ::mediapipe::Timestamp STARTING_TIMESTAMP = ::mediapipe::Timestamp(0); // TODO @atobisze common
39+
const std::string PYTHON_SESSION_SIDE_PACKET_TAG = "PYTHON_NODE_RESOURCES";
40+
const std::string LLM_SESSION_SIDE_PACKET_TAG = "LLM_NODE_RESOURCES";
3341
namespace ovms {
42+
class OutputStreamObserverI;
43+
class NullOutputStreamObserver;
44+
struct GraphHelper {
45+
std::shared_ptr<::mediapipe::CalculatorGraph> graph; // TODO FIXME this does not have to be shared_ptr
46+
std::unordered_map<std::string, std::shared_ptr<OutputStreamObserverI>> outStreamObservers;
47+
::mediapipe::Timestamp currentTimestamp; // TODO FIXME const
48+
// TODO FIXME move constr/=
49+
GraphHelper() = default;
50+
GraphHelper(const GraphHelper&) = delete;
51+
GraphHelper& operator=(const GraphHelper&) = delete;
52+
GraphHelper(GraphHelper&& gh) : graph(std::move(gh.graph)), outStreamObservers(std::move(gh.outStreamObservers)), currentTimestamp(gh.currentTimestamp) {}
53+
GraphHelper& operator=(GraphHelper&& gh) = default;
54+
};
3455
// we need to keep Graph alive during MP reload hence shared_ptr
35-
class GraphQueue : public Queue<std::shared_ptr<::mediapipe::CalculatorGraph>> {
56+
//class GraphQueue : public Queue<std::shared_ptr<::mediapipe::CalculatorGraph>> {
57+
class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> {
58+
std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap;
59+
std::shared_ptr<GenAiServableMap> genAiServableMap;
60+
3661
public:
37-
GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, int streamsLength) :
38-
Queue(streamsLength) {
39-
SPDLOG_ERROR("ER Constr graph queue:{}", (void*)this);
40-
inferRequests.reserve(streamsLength);
41-
for (auto i = 0; i < streamsLength; ++i) {
42-
inferRequests.emplace_back(std::make_shared<::mediapipe::CalculatorGraph>());
43-
std::ignore = inferRequests.back()->Initialize(config); // TODO FIXME
44-
}
45-
}
46-
~GraphQueue() {
47-
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
48-
}
62+
GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength);
63+
~GraphQueue();
4964
};
5065

5166
struct GraphIdGuard {
5267
std::weak_ptr<GraphQueue> weakQueue;
5368
const int id;
69+
std::shared_ptr<GraphHelper> gh;
70+
// TODO FIXME shared_ptr
5471
::mediapipe::CalculatorGraph& graph;
5572
GraphIdGuard(std::shared_ptr<GraphQueue>& queue) :
5673
weakQueue(queue),
5774
id(queue->getIdleStream().get()),
58-
graph(*(queue->getInferRequest(id).get())) {}
75+
gh((queue->getInferRequest(id))),
76+
graph(*gh->graph) {
77+
SPDLOG_ERROR("ER Guard construct this:{}", (void*)this);
78+
}
5979
GraphIdGuard(GraphIdGuard&&) = default;
6080
GraphIdGuard(const GraphIdGuard&) = delete;
6181
~GraphIdGuard() {
@@ -64,6 +84,7 @@ struct GraphIdGuard {
6484
if (existingQueue)
6585
existingQueue->returnStream(this->id);
6686
SPDLOG_ERROR("ER Destroy Guard end qu:{}", (void*)existingQueue.get());
87+
SPDLOG_ERROR("ER Guard destroy this:{}", (void*)this);
6788
}
6889
};
6990
} // namespace ovms

0 commit comments

Comments
 (0)