Skip to content

Add simplified live system demo #2486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: develop
Choose a base branch
from
Draft
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,17 @@ homepage = "https://nautilustrader.io"

[workspace.dependencies]
nautilus-analysis = { path = "crates/analysis", version = "0.47.0" }
nautilus-backtest = { path = "crates/backtest", version = "0.47.0" }
nautilus-cli = { path = "crates/cli", version = "0.47.0" }
nautilus-common = { path = "crates/common", version = "0.47.0" }
nautilus-core = { path = "crates/core", version = "0.47.0" }
nautilus-cryptography = { path = "crates/cryptography", version = "0.47.0" }
nautilus-data = { path = "crates/data", version = "0.47.0" }
nautilus-execution = { path = "crates/execution", version = "0.47.0" }
nautilus-indicators = { path = "crates/indicators", version = "0.47.0" }
nautilus-infrastructure = { path = "crates/infrastructure", version = "0.47.0" }
nautilus-live = { path = "crates/live", version = "0.47.0" }
nautilus-model = { path = "crates/model", version = "0.47.0" }
nautilus-network = { path = "crates/network", version = "0.47.0" }
nautilus-persistence = { path = "crates/persistence", version = "0.47.0" }
nautilus-portfolio = { path = "crates/portfolio", version = "0.47.0" }
nautilus-pyo3 = { path = "crates/pyo3", version = "0.47.0" }
nautilus-risk = { path = "crates/risk", version = "0.47.0" }
nautilus-serialization = { path = "crates/serialization", version = "0.47.0" }
nautilus-system = { path = "crates/system", version = "0.47.0" }
Expand All @@ -76,7 +72,6 @@ base64 = "0.22.1"
bytes = { version = "1.10.1", features = ["serde"] }
chrono = { version = "0.4.40", features = ["serde"] }
chrono-tz = "0.10.3"
deranged = "=0.4.0"
derive_builder = "0.20.2"
enum_dispatch = "0.3.13"
futures = "0.3.31"
Expand Down
88 changes: 88 additions & 0 deletions crates/adapters/demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
[package]
name = "nautilus-demo"
readme = "README.md"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
description.workspace = true
categories.workspace = true
keywords.workspace = true
documentation.workspace = true
repository.workspace = true
homepage.workspace = true

[lib]
name = "nautilus_demo"
crate-type = ["rlib", "cdylib"]

[features]
default = []
extension-module = [
"pyo3/extension-module",
"nautilus-core/extension-module",
"nautilus-model/extension-module",
"nautilus-serialization/extension-module",
"nautilus-network/extension-module",
"nautilus-common/extension-module",
"nautilus-data/extension-module",
]
python = [
"pyo3",
"pyo3-async-runtimes",
"nautilus-core/python",
"nautilus-model/python",
"nautilus-network/python",
"nautilus-common/python",
"nautilus-serialization/python",
"nautilus-data/python",
]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
nautilus-network = { workspace = true }
nautilus-common = { workspace = true }
nautilus-core = { workspace = true }
nautilus-model = { workspace = true }
nautilus-serialization = { workspace = true }
nautilus-data = { workspace = true }

anyhow = { workspace = true }
arrow = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
futures-util = { workspace = true }
heck = { workspace = true }
pyo3 = { workspace = true, optional = true }
pyo3-async-runtimes = { workspace = true, optional = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
rust_decimal_macros = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
thousands = { workspace = true }
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
ustr = { workspace = true }
uuid = { workspace = true }
csv = "1.3.1"
flate2 = "1.1.1"
urlencoding = "2.1.3"
futures.workspace = true
axum.workspace = true
tokio-stream = "0.1.17"
log.workspace = true

[dev-dependencies]
nautilus-testkit = { workspace = true }
criterion = { workspace = true }
rstest = { workspace = true }
tracing-test = { workspace = true }
145 changes: 145 additions & 0 deletions crates/adapters/demo/src/big_brain_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use nautilus_common::actor::Actor;
use nautilus_common::actor::registry::get_actor_unchecked;
use nautilus_common::messages::data::{
DataCommand, RequestCommand, RequestData, SubscribeCommand, SubscribeData, UnsubscribeCommand,
UnsubscribeData,
};
use nautilus_common::msgbus::handler::TypedMessageHandler;
use nautilus_common::msgbus::handler::{MessageHandler, ShareableMessageHandler};
use nautilus_common::msgbus::send;
use nautilus_common::msgbus::{register, register_response_handler};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::data::DataType;
use nautilus_model::identifiers::ClientId;
use std::any::Any;
use std::rc::Rc;
use ustr::Ustr;

/// Big brain actor receives positive and negative streams of numbers
///
/// The negative drives the positive stream and the postive after reaching
/// 10 issues a stop command. The actor should ideally behave like this
///
/// -1 -> get request
/// 1
/// -2 -> get request
/// 2
/// -3 -> skip request
/// 7 -> skip command
/// -8 -> get request
/// 8 -> stop command
pub struct BigBrainActor {
pub pos_val: i32,
pub neg_val: i32,
}

impl Default for BigBrainActor {
fn default() -> Self {
Self::new()
}
}

impl BigBrainActor {
pub fn new() -> Self {
Self {
pos_val: 0,
neg_val: 0,
}
}

pub fn register_message_handlers() {
let handler = TypedMessageHandler::from(negative_handler);
let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
register("negative_stream", handler);
}
}

impl Actor for BigBrainActor {
fn id(&self) -> Ustr {
Ustr::from("big_brain_actor")
}

fn handle(&mut self, msg: &dyn Any) {

Check failure on line 62 in crates/adapters/demo/src/big_brain_actor.rs

View workflow job for this annotation

GitHub Actions / pre-commit

unused variable: `msg`
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// Negative integer stream handler
///
/// It prints each positive number it receives. For each negative number
/// it makes requests a positive number. When negative number is equal to -3
/// it issues a skipped positive number request instead.
pub fn negative_handler(msg: &i32) {
let actor_id = Ustr::from("big_brain_actor");
let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
big_brain_actor.neg_val = *msg;

println!("Received negative value: {}", big_brain_actor.neg_val);

let correlation_id = UUID4::new();
let handler = TypedMessageHandler::from(positive_handler);
let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
register_response_handler(&correlation_id, handler);

let data_type = if big_brain_actor.neg_val == -3 {
DataType::new("skip", None)
} else {
DataType::new("get", None)
};

let request = RequestData {
client_id: ClientId::new("mock_data_client"),
data_type,
request_id: correlation_id,
ts_init: UnixNanos::new(0),
params: None,
};
let cmd = DataCommand::Request(RequestCommand::Data(request));

send(&Ustr::from("data_engine"), &cmd);
}

/// Positive integer stream handler
///
/// It prints each positive number it receives. When the positive value
/// exceeds 3, it issues a skip command for the negative stream. When it exceeds
/// 8 it issues a stop command for the negative stream
pub fn positive_handler(msg: &i32) {
let actor_id = Ustr::from("big_brain_actor");
let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
big_brain_actor.pos_val = *msg;

println!("Received positive value: {}", big_brain_actor.pos_val);

let data_type = DataType::new("blah", None);

if big_brain_actor.pos_val == 3 {
let data = SubscribeData::new(
Some(ClientId::new("mock_data_client")),
None,
data_type.clone(),
UUID4::new(),
UnixNanos::new(0),
None,
);
let cmd = DataCommand::Subscribe(SubscribeCommand::Data(data));
send(&Ustr::from("data_engine"), &cmd);
}

if big_brain_actor.pos_val > 8 {
let data = UnsubscribeData::new(
Some(ClientId::new("mock_data_client")),
None,
data_type,
UUID4::new(),
UnixNanos::new(0),
None,
);
let cmd = DataCommand::Unsubscribe(UnsubscribeCommand::Data(data));
send(&Ustr::from("data_engine"), &cmd);
}
}
36 changes: 36 additions & 0 deletions crates/adapters/demo/src/bin/network_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::cell::UnsafeCell;
use std::rc::Rc;

use nautilus_common::actor::registry::register_actor;
use nautilus_common::testing::init_logger_for_testing;
use nautilus_demo::LiveRunner;
use nautilus_demo::big_brain_actor::BigBrainActor;
use nautilus_demo::http_server::start_positive_stream_http_server;
use nautilus_demo::init_data_engine;
use nautilus_demo::websocket_server::NegativeStreamServer;

async fn main_logic() {
let http_address = start_positive_stream_http_server().await.unwrap();
let websocket_server = NegativeStreamServer::setup().await;

// Initialize data client with http and websocket streams
let (http_stream, websocket_stream) =
init_data_engine(http_address, websocket_server.address).await;

// Initialize big brain actor
let big_brain_actor = BigBrainActor::new();
let big_brain_actor = Rc::new(UnsafeCell::new(big_brain_actor));
register_actor(big_brain_actor);
BigBrainActor::register_message_handlers();

let mut runner = LiveRunner::default();
runner.new_add_data_response_stream(http_stream);
runner.new_message_stream(websocket_stream);
runner.run().await;
}

pub fn main() {
init_logger_for_testing(None).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(main_logic());
}
Loading
Loading