diff --git a/Cargo.lock b/Cargo.lock index 25e03dd8fe4..79afbc72de3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3183,6 +3183,51 @@ dependencies = [ "ustr", ] +[[package]] +name = "nautilus-demo" +version = "0.47.0" +dependencies = [ + "anyhow", + "arrow", + "axum", + "chrono", + "criterion", + "csv", + "derive_builder", + "flate2", + "futures", + "futures-util", + "heck 0.5.0", + "log", + "nautilus-common", + "nautilus-core", + "nautilus-data", + "nautilus-model", + "nautilus-network", + "nautilus-serialization", + "nautilus-testkit", + "pyo3", + "pyo3-async-runtimes", + "reqwest", + "rstest", + "rust_decimal", + "rust_decimal_macros", + "serde", + "serde_json", + "strum", + "thiserror 2.0.12", + "thousands", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", + "tracing-test", + "urlencoding", + "ustr", + "uuid", +] + [[package]] name = "nautilus-execution" version = "0.47.0" diff --git a/Cargo.toml b/Cargo.toml index 83661616830..79e61fbe094 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,6 @@ homepage = "https://nautilustrader.io" [workspace.dependencies] nautilus-analysis = { path = "crates/analysis", version = "0.47.0" } -nautilus-backtest = { path = "crates/backtest", version = "0.47.0" } -nautilus-cli = { path = "crates/cli", version = "0.47.0" } nautilus-common = { path = "crates/common", version = "0.47.0" } nautilus-core = { path = "crates/core", version = "0.47.0" } nautilus-cryptography = { path = "crates/cryptography", version = "0.47.0" } @@ -50,12 +48,10 @@ nautilus-data = { path = "crates/data", version = "0.47.0" } nautilus-execution = { path = "crates/execution", version = "0.47.0" } nautilus-indicators = { path = "crates/indicators", version = "0.47.0" } nautilus-infrastructure = { path = "crates/infrastructure", version = "0.47.0" } -nautilus-live = { path = "crates/live", version = "0.47.0" } nautilus-model = { path = "crates/model", version = "0.47.0" } nautilus-network = { path = "crates/network", version = "0.47.0" } nautilus-persistence = { path = "crates/persistence", version = "0.47.0" } nautilus-portfolio = { path = "crates/portfolio", version = "0.47.0" } -nautilus-pyo3 = { path = "crates/pyo3", version = "0.47.0" } nautilus-risk = { path = "crates/risk", version = "0.47.0" } nautilus-serialization = { path = "crates/serialization", version = "0.47.0" } nautilus-system = { path = "crates/system", version = "0.47.0" } @@ -76,7 +72,6 @@ base64 = "0.22.1" bytes = { version = "1.10.1", features = ["serde"] } chrono = { version = "0.4.40", features = ["serde"] } chrono-tz = "0.10.3" -deranged = "=0.4.0" derive_builder = "0.20.2" enum_dispatch = "0.3.13" futures = "0.3.31" diff --git a/crates/adapters/demo/Cargo.toml b/crates/adapters/demo/Cargo.toml new file mode 100644 index 00000000000..97ea8c7ec55 --- /dev/null +++ b/crates/adapters/demo/Cargo.toml @@ -0,0 +1,88 @@ +[package] +name = "nautilus-demo" +readme = "README.md" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +description.workspace = true +categories.workspace = true +keywords.workspace = true +documentation.workspace = true +repository.workspace = true +homepage.workspace = true + +[lib] +name = "nautilus_demo" +crate-type = ["rlib", "cdylib"] + +[features] +default = [] +extension-module = [ + "pyo3/extension-module", + "nautilus-core/extension-module", + "nautilus-model/extension-module", + "nautilus-serialization/extension-module", + "nautilus-network/extension-module", + "nautilus-common/extension-module", + "nautilus-data/extension-module", +] +python = [ + "pyo3", + "pyo3-async-runtimes", + "nautilus-core/python", + "nautilus-model/python", + "nautilus-network/python", + "nautilus-common/python", + "nautilus-serialization/python", + "nautilus-data/python", +] + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +nautilus-network = { workspace = true } +nautilus-common = { workspace = true } +nautilus-core = { workspace = true } +nautilus-model = { workspace = true } +nautilus-serialization = { workspace = true } +nautilus-data = { workspace = true } + +anyhow = { workspace = true } +arrow = { workspace = true } +chrono = { workspace = true } +derive_builder = { workspace = true } +futures-util = { workspace = true } +heck = { workspace = true } +pyo3 = { workspace = true, optional = true } +pyo3-async-runtimes = { workspace = true, optional = true } +reqwest = { workspace = true } +rust_decimal = { workspace = true } +rust_decimal_macros = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +strum = { workspace = true } +thiserror = { workspace = true } +thousands = { workspace = true } +tokio = { workspace = true } +tokio-tungstenite = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +ustr = { workspace = true } +uuid = { workspace = true } +csv = "1.3.1" +flate2 = "1.1.1" +urlencoding = "2.1.3" +futures.workspace = true +axum.workspace = true +tokio-stream = "0.1.17" +log.workspace = true + +[dev-dependencies] +nautilus-testkit = { workspace = true } +criterion = { workspace = true } +rstest = { workspace = true } +tracing-test = { workspace = true } diff --git a/crates/adapters/demo/src/big_brain_actor.rs b/crates/adapters/demo/src/big_brain_actor.rs new file mode 100644 index 00000000000..3f681136394 --- /dev/null +++ b/crates/adapters/demo/src/big_brain_actor.rs @@ -0,0 +1,145 @@ +use nautilus_common::actor::Actor; +use nautilus_common::actor::registry::get_actor_unchecked; +use nautilus_common::messages::data::{ + DataCommand, RequestCommand, RequestData, SubscribeCommand, SubscribeData, UnsubscribeCommand, + UnsubscribeData, +}; +use nautilus_common::msgbus::handler::TypedMessageHandler; +use nautilus_common::msgbus::handler::{MessageHandler, ShareableMessageHandler}; +use nautilus_common::msgbus::send; +use nautilus_common::msgbus::{register, register_response_handler}; +use nautilus_core::{UUID4, UnixNanos}; +use nautilus_model::data::DataType; +use nautilus_model::identifiers::ClientId; +use std::any::Any; +use std::rc::Rc; +use ustr::Ustr; + +/// Big brain actor receives positive and negative streams of numbers +/// +/// The negative drives the positive stream and the postive after reaching +/// 10 issues a stop command. The actor should ideally behave like this +/// +/// -1 -> get request +/// 1 +/// -2 -> get request +/// 2 +/// -3 -> skip request +/// 7 -> skip command +/// -8 -> get request +/// 8 -> stop command +pub struct BigBrainActor { + pub pos_val: i32, + pub neg_val: i32, +} + +impl Default for BigBrainActor { + fn default() -> Self { + Self::new() + } +} + +impl BigBrainActor { + pub fn new() -> Self { + Self { + pos_val: 0, + neg_val: 0, + } + } + + pub fn register_message_handlers() { + let handler = TypedMessageHandler::from(negative_handler); + let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc); + register("negative_stream", handler); + } +} + +impl Actor for BigBrainActor { + fn id(&self) -> Ustr { + Ustr::from("big_brain_actor") + } + + fn handle(&mut self, msg: &dyn Any) { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Negative integer stream handler +/// +/// It prints each positive number it receives. For each negative number +/// it makes requests a positive number. When negative number is equal to -3 +/// it issues a skipped positive number request instead. +pub fn negative_handler(msg: &i32) { + let actor_id = Ustr::from("big_brain_actor"); + let big_brain_actor = get_actor_unchecked::(&actor_id); + big_brain_actor.neg_val = *msg; + + println!("Received negative value: {}", big_brain_actor.neg_val); + + let correlation_id = UUID4::new(); + let handler = TypedMessageHandler::from(positive_handler); + let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc); + register_response_handler(&correlation_id, handler); + + let data_type = if big_brain_actor.neg_val == -3 { + DataType::new("skip", None) + } else { + DataType::new("get", None) + }; + + let request = RequestData { + client_id: ClientId::new("mock_data_client"), + data_type, + request_id: correlation_id, + ts_init: UnixNanos::new(0), + params: None, + }; + let cmd = DataCommand::Request(RequestCommand::Data(request)); + + send(&Ustr::from("data_engine"), &cmd); +} + +/// Positive integer stream handler +/// +/// It prints each positive number it receives. When the positive value +/// exceeds 3, it issues a skip command for the negative stream. When it exceeds +/// 8 it issues a stop command for the negative stream +pub fn positive_handler(msg: &i32) { + let actor_id = Ustr::from("big_brain_actor"); + let big_brain_actor = get_actor_unchecked::(&actor_id); + big_brain_actor.pos_val = *msg; + + println!("Received positive value: {}", big_brain_actor.pos_val); + + let data_type = DataType::new("blah", None); + + if big_brain_actor.pos_val == 3 { + let data = SubscribeData::new( + Some(ClientId::new("mock_data_client")), + None, + data_type.clone(), + UUID4::new(), + UnixNanos::new(0), + None, + ); + let cmd = DataCommand::Subscribe(SubscribeCommand::Data(data)); + send(&Ustr::from("data_engine"), &cmd); + } + + if big_brain_actor.pos_val > 8 { + let data = UnsubscribeData::new( + Some(ClientId::new("mock_data_client")), + None, + data_type, + UUID4::new(), + UnixNanos::new(0), + None, + ); + let cmd = DataCommand::Unsubscribe(UnsubscribeCommand::Data(data)); + send(&Ustr::from("data_engine"), &cmd); + } +} diff --git a/crates/adapters/demo/src/bin/network_stream.rs b/crates/adapters/demo/src/bin/network_stream.rs new file mode 100644 index 00000000000..9a3299cd96f --- /dev/null +++ b/crates/adapters/demo/src/bin/network_stream.rs @@ -0,0 +1,36 @@ +use std::cell::UnsafeCell; +use std::rc::Rc; + +use nautilus_common::actor::registry::register_actor; +use nautilus_common::testing::init_logger_for_testing; +use nautilus_demo::LiveRunner; +use nautilus_demo::big_brain_actor::BigBrainActor; +use nautilus_demo::http_server::start_positive_stream_http_server; +use nautilus_demo::init_data_engine; +use nautilus_demo::websocket_server::NegativeStreamServer; + +async fn main_logic() { + let http_address = start_positive_stream_http_server().await.unwrap(); + let websocket_server = NegativeStreamServer::setup().await; + + // Initialize data client with http and websocket streams + let (http_stream, websocket_stream) = + init_data_engine(http_address, websocket_server.address).await; + + // Initialize big brain actor + let big_brain_actor = BigBrainActor::new(); + let big_brain_actor = Rc::new(UnsafeCell::new(big_brain_actor)); + register_actor(big_brain_actor); + BigBrainActor::register_message_handlers(); + + let mut runner = LiveRunner::default(); + runner.new_add_data_response_stream(http_stream); + runner.new_message_stream(websocket_stream); + runner.run().await; +} + +pub fn main() { + init_logger_for_testing(None).unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(main_logic()); +} diff --git a/crates/adapters/demo/src/data_client.rs b/crates/adapters/demo/src/data_client.rs new file mode 100644 index 00000000000..422de9ffacf --- /dev/null +++ b/crates/adapters/demo/src/data_client.rs @@ -0,0 +1,209 @@ +use nautilus_common::messages::data::{self, DataResponse, RequestData}; +use nautilus_common::runtime; +use nautilus_core::UnixNanos; +use nautilus_data::client::DataClient; +use nautilus_model::data::DataType; +use nautilus_model::identifiers::{ClientId, Venue}; +use nautilus_network::http::HttpClient; +use nautilus_network::websocket::{Consumer, WebSocketClient, WebSocketConfig}; +use reqwest::Method; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; + +pub struct MockDataClient { + http_address: SocketAddr, + http_client: HttpClient, + websocket_client: Arc, + http_tx: tokio::sync::mpsc::UnboundedSender, +} + +impl MockDataClient { + pub async fn start( + http_address: SocketAddr, + websocket_address: SocketAddr, + ) -> ( + Self, + tokio_stream::wrappers::UnboundedReceiverStream, + tokio_stream::wrappers::ReceiverStream, + ) { + // Create HTTP client with default settings + let http_client = HttpClient::new( + std::collections::HashMap::new(), // empty headers + Vec::new(), // no header keys + Vec::new(), // no keyed quotas + None, // no default quota + Some(5), // 30 second timeout + ); + + println!( + "Started mock data client with HTTP endpoint: {:?}", + http_address + ); + println!("WebSocket endpoint: {:?}", websocket_address); + + let (tx, rx) = tokio::sync::mpsc::channel(100); + let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel(); + + let config = WebSocketConfig { + url: format!("ws://{}", websocket_address).to_string(), + headers: vec![], + handler: Consumer::Rust(tx), + heartbeat: None, + heartbeat_msg: None, + ping_handler: None, + reconnect_timeout_ms: None, + reconnect_delay_initial_ms: None, + reconnect_delay_max_ms: None, + reconnect_backoff_factor: None, + reconnect_jitter_ms: None, + }; + + let websocket_client = WebSocketClient::connect(config, None, None, None, Vec::new(), None) + .await + .unwrap(); + + let http_stream = UnboundedReceiverStream::new(http_rx); + let websocket_stream = ReceiverStream::new(rx); + + ( + Self { + http_address, + http_client, + http_tx, + websocket_client: Arc::new(websocket_client), + }, + http_stream, + websocket_stream, + ) + } + + fn get_request(&self, req: &RequestData) { + let req = req.clone(); + let http_client = self.http_client.clone(); + let http_tx = self.http_tx.clone(); + let http_address = self.http_address.clone(); + runtime::get_runtime().spawn(async move { + let response = http_client + .request( + Method::GET, + format!("http://{}/get", http_address), + None, + None, + None, + None, + ) + .await + .unwrap(); + + let value = String::from_utf8(response.body.to_vec()) + .unwrap() + .parse::() + .unwrap(); + println!("Received positive value: {}", value); + let response = DataResponse::new( + req.request_id, + req.client_id, + Venue::new("http positive stream"), + DataType::new("positive_stream", None), + value, + UnixNanos::new(0), + None, + ); + http_tx.send(response).unwrap(); + }); + } + + fn skip_request(&self, req: &RequestData) { + let req = req.clone(); + let http_client = self.http_client.clone(); + let http_tx = self.http_tx.clone(); + let http_address = self.http_address.clone(); + runtime::get_runtime().spawn(async move { + let response = http_client + .request( + Method::GET, + format!("http://{}/skip", http_address), + None, + None, + None, + None, + ) + .await + .unwrap(); + + let value = String::from_utf8(response.body.to_vec()) + .unwrap() + .parse::() + .unwrap(); + println!("Received positive value: {}", value); + + let response = DataResponse::new( + req.request_id, + req.client_id, + Venue::new("http positive stream"), + DataType::new("positive_stream", None), + value, + UnixNanos::new(0), + None, + ); + http_tx.send(response).unwrap(); + }); + } +} + +impl DataClient for MockDataClient { + fn client_id(&self) -> nautilus_model::identifiers::ClientId { + ClientId::new("mock_data_client") + } + + fn request_data(&self, request: RequestData) -> anyhow::Result<()> { + if request.data_type.type_name() == "get" { + println!("Received get data request"); + self.get_request(&request); + } else if request.data_type.type_name() == "skip" { + println!("Received skip data request"); + self.skip_request(&request); + } + + Ok(()) + } + + fn subscribe(&mut self, cmd: data::SubscribeData) -> anyhow::Result<()> { + println!("Received subscribe command"); + let websocket_client = self.websocket_client.clone(); + runtime::get_runtime().spawn(async move { + websocket_client.send_text("SKIP".to_string(), None).await; + }); + Ok(()) + } + + fn unsubscribe(&mut self, cmd: data::UnsubscribeData) -> anyhow::Result<()> { + println!("Received unsubscribe command"); + let websocket_client = self.websocket_client.clone(); + runtime::get_runtime().spawn(async move { + websocket_client.send_text("STOP".to_string(), None).await; + }); + Ok(()) + } + + fn venue(&self) -> Option { + None + } + + fn start(&self) {} + + fn stop(&self) {} + + fn reset(&self) {} + + fn dispose(&self) {} + + fn is_connected(&self) -> bool { + true + } + + fn is_disconnected(&self) -> bool { + false + } +} diff --git a/crates/adapters/demo/src/http_server.rs b/crates/adapters/demo/src/http_server.rs new file mode 100644 index 00000000000..8c177998ab9 --- /dev/null +++ b/crates/adapters/demo/src/http_server.rs @@ -0,0 +1,58 @@ +use std::net::{SocketAddr, TcpListener}; + +use axum::{Router, routing::get, serve}; + +fn get_unique_port() -> u16 { + // Create a temporary TcpListener to get an available port + let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind temporary TcpListener"); + let port = listener.local_addr().unwrap().port(); + + // Close the listener to free up the port + drop(listener); + + port +} + +pub async fn start_positive_stream_http_server() +-> Result> { + let port = get_unique_port(); + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{port}")) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + serve(listener, create_positive_stream_router()) + .await + .unwrap(); + }); + + Ok(addr) +} + +fn create_positive_stream_router() -> Router { + // Create a counter state that will be shared across requests + let counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0)); + + // Clone the counter for the handler + let counter_clone = counter.clone(); + let counter_clone_2 = counter.clone(); + + Router::new() + .route( + "/get", + get(async move || { + // Increment the counter and return the new value + let value = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + format!("{}", value) + }), + ) + .route( + "/skip", + get(async move || { + // Increment the counter and return the new value + let value = counter_clone_2.fetch_add(5, std::sync::atomic::Ordering::SeqCst); + format!("{}", value) + }), + ) +} diff --git a/crates/adapters/demo/src/lib.rs b/crates/adapters/demo/src/lib.rs new file mode 100644 index 00000000000..4905542f33e --- /dev/null +++ b/crates/adapters/demo/src/lib.rs @@ -0,0 +1,104 @@ +use std::{cell::RefCell, net::SocketAddr, rc::Rc}; +use tokio_stream::StreamExt; + +use data_client::MockDataClient; +use futures::stream::SelectAll; +use nautilus_common::{ + cache::Cache, + clock::{Clock, LiveClock}, + messages::data::DataResponse, + msgbus::{ + handler::{MessageHandler, ShareableMessageHandler}, + register, + }, +}; +use nautilus_data::{ + client::{DataClient, DataClientAdapter}, + engine::{DataEngine, SubscriptionCommandHandler}, +}; +use nautilus_model::identifiers::Venue; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; +use tokio_tungstenite::tungstenite::Message; +use ustr::Ustr; + +pub mod big_brain_actor; +pub mod data_client; +pub mod http_server; +pub mod websocket_server; + +pub async fn init_data_engine( + http_address: SocketAddr, + websocket_address: SocketAddr, +) -> ( + tokio_stream::wrappers::UnboundedReceiverStream, + tokio_stream::wrappers::ReceiverStream, +) { + let (client, http_stream, websocket_stream) = + MockDataClient::start(http_address, websocket_address).await; + let client: Box = Box::new(client); + let clock: Rc> = Rc::new(RefCell::new(LiveClock::new())); + + let adapter = DataClientAdapter::new( + client.client_id(), + Venue::from_str_unchecked("yooohooo"), + false, + false, + client, + clock.clone(), + ); + let cache = Rc::new(RefCell::new(Cache::new(None, None))); + + let mut engine = DataEngine::new(clock, cache, None); + engine.register_client(adapter, None); + + let engine = Rc::new(RefCell::new(engine)); + let handler = SubscriptionCommandHandler { + id: Ustr::from("data_engine_handler"), + engine_ref: engine.clone(), + }; + + let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc); + register("data_engine", handler); + + (http_stream, websocket_stream) +} + +#[derive(Default)] +pub struct LiveRunner { + data_response_stream: SelectAll>, + message_stream: SelectAll>, +} + +impl LiveRunner { + pub fn new_add_data_response_stream(&mut self, stream: UnboundedReceiverStream) { + self.data_response_stream.push(stream); + } + + pub fn new_message_stream(&mut self, stream: ReceiverStream) { + self.message_stream.push(stream); + } + + pub async fn run(&mut self) { + loop { + // TODO: push decoding logic into data client + tokio::select! { + data_response = self.data_response_stream.next() => { + if let Some(data_response) = data_response { + println!("Received data response: {:?}", data_response); + let value = data_response.data.downcast_ref::().copied().unwrap(); + nautilus_common::msgbus::response(&data_response.correlation_id, &value); + } + } + message = self.message_stream.next() => { + if let Some(message) = message { + if let Message::Text(text) = message { + println!("Received text message: {}", text); + let data = text.parse::().unwrap(); + nautilus_common::msgbus::send(&Ustr::from("negative_stream"), &data); + } + } + } + } + } + } +} diff --git a/crates/adapters/demo/src/websocket_server.rs b/crates/adapters/demo/src/websocket_server.rs new file mode 100644 index 00000000000..d07c2ce3edf --- /dev/null +++ b/crates/adapters/demo/src/websocket_server.rs @@ -0,0 +1,77 @@ +use futures::SinkExt; +use futures::StreamExt; +use std::net::SocketAddr; +use tokio::task; +use tokio::time::Duration; + +pub struct NegativeStreamServer { + task: tokio::task::JoinHandle<()>, + port: u16, + pub address: SocketAddr, +} + +impl NegativeStreamServer { + pub async fn setup() -> Self { + let server = tokio::net::TcpListener::bind("127.0.0.1:0".to_string()) + .await + .unwrap(); + let port = server.local_addr().unwrap().port(); + let address = server.local_addr().unwrap(); + + let task = task::spawn(async move { + let (conn, _) = server.accept().await.unwrap(); + let websocket = tokio_tungstenite::accept_async(conn).await.unwrap(); + let (mut sender, mut receiver) = websocket.split(); + + // Create a counter for negative values + let counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0)); + let counter_clone = counter.clone(); + let counter_clone_2 = counter.clone(); + + // Task to send negative numbers every second + let sender_task = task::spawn(async move { + loop { + let value = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let message = tokio_tungstenite::tungstenite::protocol::Message::Text( + format!("{}", -value).into(), + ); + + if let Err(err) = sender.send(message).await { + eprintln!("Error sending message: {}", err); + break; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // Task to handle incoming messages + task::spawn(async move { + while let Some(Ok(msg)) = receiver.next().await { + if let tokio_tungstenite::tungstenite::protocol::Message::Text(txt) = msg { + if txt == "SKIP" { + counter_clone_2.fetch_add(5, std::sync::atomic::Ordering::SeqCst); + } else if txt == "STOP" { + break; + } + } + } + + // Cancel the sender task when we're done + sender_task.abort(); + }); + }); + + Self { + task, + address, + port, + } + } +} + +impl Drop for NegativeStreamServer { + fn drop(&mut self) { + self.task.abort(); + } +} diff --git a/crates/common/src/msgbus/handler.rs b/crates/common/src/msgbus/handler.rs index 2d0746368c7..d75bf5fbb73 100644 --- a/crates/common/src/msgbus/handler.rs +++ b/crates/common/src/msgbus/handler.rs @@ -74,6 +74,9 @@ impl MessageHandler for TypedMessageHandler() { (self.callback)(typed_msg); + } else { + // TODO: better error message + log::error!("Failed to downcast message") } } diff --git a/crates/common/src/msgbus/mod.rs b/crates/common/src/msgbus/mod.rs index 78758fc3f19..5a159dde391 100644 --- a/crates/common/src/msgbus/mod.rs +++ b/crates/common/src/msgbus/mod.rs @@ -102,6 +102,15 @@ pub fn response(correlation_id: &UUID4, message: &dyn Any) { } } +pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) { + if let Err(e) = get_message_bus() + .borrow_mut() + .register_response_handler(correlation_id, handler) + { + log::error!("Failed to register request handler: {e}"); + } +} + /// Publishes the `message` to the `topic`. pub fn publish(topic: &Ustr, message: &dyn Any) { log::trace!("Publishing topic '{topic}' {message:?}"); diff --git a/crates/data/src/engine/mod.rs b/crates/data/src/engine/mod.rs index 81e618a3c4d..4a92aea5f56 100644 --- a/crates/data/src/engine/mod.rs +++ b/crates/data/src/engine/mod.rs @@ -1046,6 +1046,7 @@ impl DataEngine { } } +// TODO: Rename to Data command handler pub struct SubscriptionCommandHandler { pub id: Ustr, pub engine_ref: Rc>, @@ -1057,7 +1058,14 @@ impl MessageHandler for SubscriptionCommandHandler { } fn handle(&self, msg: &dyn Any) { - self.engine_ref.borrow_mut().enqueue(msg); + println!("Received message"); + // TODO: Directly execute. Enqueueing changes order + // self.engine_ref.borrow_mut().enqueue(msg); + if let Some(cmd) = msg.downcast_ref::() { + self.engine_ref.borrow_mut().execute(cmd.clone()); + } else { + log::error!("Invalid message type received: {msg:?}"); + } } fn as_any(&self) -> &dyn Any {