Skip to content

Commit c6e7dfa

Browse files
author
rmqtt
committed
Add average rate and intermediate rate statistics
1 parent 7cbdc0d commit c6e7dfa

File tree

2 files changed

+126
-35
lines changed

2 files changed

+126
-35
lines changed

src/stats.rs

+106-27
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use std::sync::atomic::{AtomicIsize, Ordering};
1+
use std::fmt;
2+
use std::fmt::Debug;
23

34
use dashmap::DashMap;
45
use once_cell::sync::OnceCell;
@@ -55,12 +56,23 @@ impl Stats {
5556

5657
pub fn to_string(&self) -> String {
5758
let stats = Stats::instance();
59+
5860
let conns = stats.conns.value();
5961
let conns_rate = stats.conns.rate();
62+
let conns_avg_rate = stats.conns.avg_rate();
63+
let conns_medi_rate = stats.conns.medi_rate();
64+
6065
let subs = stats.subs.value();
66+
let subs_rate = stats.subs.rate();
67+
let subs_avg_rate = stats.subs.avg_rate();
68+
let subs_medi_rate = stats.subs.medi_rate();
69+
6170
let conn_fails = stats.conn_fails.value();
6271
let recvs = stats.recvs.value();
6372
let recvs_rate = stats.recvs.rate();
73+
let recvs_avg_rate = stats.recvs.avg_rate();
74+
let recvs_medi_rate = stats.recvs.medi_rate();
75+
6476
let sends = stats.sends.value();
6577
let sends_rate = stats.sends.rate();
6678
let closeds = stats.closeds.value();
@@ -69,65 +81,132 @@ impl Stats {
6981
.iter()
7082
.map(|entry| (entry.key().clone(), entry.value().value()))
7183
.collect::<Vec<(String, isize)>>();
72-
format!("* Connecteds:{} {:0.2?}/s, conn_fails:{}, subs:{}, sends:{} {:0.2?}/s, recvs:{} {:0.2?}/s, closeds:{}, ifaddrs: {:?}, last err: {:?}",
73-
conns, conns_rate, conn_fails, subs, sends, sends_rate, recvs, recvs_rate, closeds, ifaddrs, stats.last_err.write().take())
84+
format!("* Connecteds:{} ({:0.2?}/s, {:0.2?}/s, {:0.2?}/s), conn_fails:{}, subs:{} ({:0.2?}/s, {:0.2?}/s, {:0.2?}/s), sends:{} {:0.2?}/s, recvs:{} ({:0.2?}/s, {:0.2?}/s, {:0.2?}/s), closeds:{}, ifaddrs: {:?}, last err: {:?}",
85+
conns, conns_rate, conns_avg_rate, conns_medi_rate, conn_fails, subs, subs_rate, subs_avg_rate,subs_medi_rate, sends, sends_rate, recvs, recvs_rate, recvs_avg_rate, recvs_medi_rate, closeds, ifaddrs, stats.last_err.write().take())
7486
}
7587
}
7688

77-
#[derive(Debug)]
78-
pub struct Counter(AtomicIsize, RwLock<DiscreteRateCounter>);
89+
pub type StartMillis = i64;
90+
pub type EndMillis = i64;
91+
92+
pub struct Counter{
93+
inner: RwLock<CounterInner>
94+
}
95+
96+
pub struct CounterInner{
97+
nums: isize,
98+
rate: DiscreteRateCounter,
99+
cost_times: Vec<(StartMillis, EndMillis)>,
100+
}
101+
102+
impl Debug for Counter {
103+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104+
let inner = self.inner.read();
105+
write!(f, "(nums: {}, rate: {:?}, cost_times_len: {})", inner.nums, inner.rate.rate(), inner.cost_times.len())
106+
}
107+
}
79108

80109
impl Counter {
81110
#[inline]
82111
fn new() -> Self {
83-
Counter(
84-
AtomicIsize::new(0),
85-
RwLock::new(DiscreteRateCounter::new(10)),
86-
)
112+
Counter {
113+
inner: RwLock::new(
114+
CounterInner {
115+
nums: 0,
116+
rate: DiscreteRateCounter::new(100),
117+
cost_times: Vec::new()
118+
})
119+
}
87120
}
88121

89122
#[inline]
90123
pub fn inc(&self) {
91-
self.0.fetch_add(1, Ordering::SeqCst);
92-
self.1.write().update();
124+
let mut inner = self.inner.write();
125+
inner.nums += 1;
126+
inner.rate.update();
93127
}
94128

95129
#[inline]
130+
pub fn inc2(&self, start: StartMillis, end: EndMillis) {
131+
let mut inner = self.inner.write();
132+
inner.nums += 1;
133+
inner.rate.update();
134+
inner.cost_times.push((start, end));
135+
}
136+
96137
pub fn inc_limit(&self, limit: isize) -> bool {
97-
let res = self
98-
.0
99-
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
100-
if x < limit {
101-
Some(x + 1)
102-
} else {
103-
None
104-
}
105-
});
106-
if res.is_ok() {
107-
self.1.write().update();
138+
let mut inner = self.inner.write();
139+
if inner.nums < limit {
140+
inner.nums += 1;
141+
inner.rate.update();
108142
true
109-
} else {
143+
}else{
110144
false
111145
}
112146
}
113147

148+
#[inline]
149+
pub fn inc_limit_cost_times(&self, limit_cost_times: usize, start: StartMillis, end: EndMillis) {
150+
let mut inner = self.inner.write();
151+
inner.nums += 1;
152+
inner.rate.update();
153+
if inner.cost_times.len() > limit_cost_times {
154+
inner.cost_times.remove(0);
155+
156+
}
157+
inner.cost_times.push((start, end));
158+
}
159+
114160
#[inline]
115161
pub fn dec(&self) {
116-
self.0.fetch_sub(1, Ordering::SeqCst);
162+
self.inner.write().nums += 1;
117163
}
118164

119165
#[inline]
120166
pub fn decs(&self, v: isize) {
121-
self.0.fetch_sub(v, Ordering::SeqCst);
167+
self.inner.write().nums += v;
122168
}
123169

124170
#[inline]
125171
pub fn value(&self) -> isize {
126-
self.0.load(Ordering::SeqCst)
172+
self.inner.read().nums
127173
}
128174

129175
#[inline]
130176
pub fn rate(&self) -> f64 {
131-
self.1.read().rate()
177+
self.inner.read().rate.rate()
178+
}
179+
180+
#[inline]
181+
pub fn avg_rate(&self) -> f64 {
182+
let inner = self.inner.read();
183+
if inner.cost_times.len() > 1 {
184+
//average rate
185+
let (first, _) = inner.cost_times.first().unwrap();
186+
let (_, last) = inner.cost_times.last().unwrap();
187+
//println!("avg_rate inner.cost_times.len(): {}, {:?}, first: {}, last: {}", inner.cost_times.len(), ((last - first) as f64 / 1000.0), first, last);
188+
inner.cost_times.len() as f64 / ((last - first) as f64 / 1000.0)
189+
}else{
190+
1.0
191+
}
192+
193+
}
194+
195+
#[inline]
196+
pub fn medi_rate(&self) -> f64 {
197+
let inner = self.inner.read();
198+
if inner.cost_times.len() > 1 {
199+
//intermediate rate
200+
let len = inner.cost_times.len();
201+
let start_idx = (len as f64 * 0.15) as usize;
202+
let end_idx = len - start_idx - 1;
203+
let (first, _) = inner.cost_times[start_idx];
204+
let (_, last) = inner.cost_times[end_idx];
205+
//println!("medi_rate inner.cost_times.len(): {}, {:?}", end_idx - start_idx, ((last - first) as f64 / 1000.0));
206+
(end_idx - start_idx) as f64 / ((last - first) as f64 / 1000.0)
207+
}else{
208+
1.0
209+
}
210+
132211
}
133212
}

src/v3.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::borrow::Cow;
12
use futures::channel::mpsc;
23
use futures::SinkExt;
34
use futures::StreamExt;
@@ -17,6 +18,7 @@ use std::sync::Arc;
1718
use std::time::Duration;
1819
use tokio::task::spawn_local;
1920
use uuid::Uuid;
21+
use crate::stats::{EndMillis, StartMillis};
2022

2123
use super::connector::ConnectorFactory;
2224
use super::Stats;
@@ -139,8 +141,8 @@ impl Client {
139141
self.msg_tx.read().as_ref().cloned()
140142
}
141143

142-
fn subs_inc(&self) {
143-
Stats::instance().subs.inc();
144+
fn subs_inc(&self, start: StartMillis, end: EndMillis) {
145+
Stats::instance().subs.inc2(start, end);
144146
self.subs.fetch_add(1, Ordering::SeqCst);
145147
}
146148

@@ -233,15 +235,16 @@ impl Client {
233235
let connect_enable = mgr.on_connect.fire(client.clone());
234236

235237
if connect_enable {
238+
let start = chrono::Local::now().timestamp_millis();
236239
match builder.connect().await {
237240
Ok(c) => {
238241
mgr.on_connected.fire(client.clone());
239242

240243
let sink = c.sink();
241244

242245
client.set_sink(sink.clone());
243-
244-
Stats::instance().conns.inc();
246+
let end = chrono::Local::now().timestamp_millis();
247+
Stats::instance().conns.inc2(start, end);
245248

246249
//subscribe
247250
if opts.sub_switch {
@@ -306,21 +309,23 @@ impl Client {
306309
client_id: String,
307310
) {
308311
'subscribe: loop {
312+
let start = chrono::Local::now().timestamp_millis();
309313
match sink
310314
.subscribe()
311315
.topic_filter(sub_topic.clone(), qos)
312316
.send()
313317
.await
314318
{
315319
Ok(rets) => {
320+
let end = chrono::Local::now().timestamp_millis();
316321
for ret in rets {
317322
if let SubscribeReturnCode::Failure = ret {
318323
log::debug!("{:?} subscribe failure", client_id);
319324
Stats::instance().set_last_err("subscribe failure".into());
320325
time::sleep(Duration::from_secs(5)).await;
321326
continue 'subscribe;
322327
} else {
323-
c.subs_inc();
328+
c.subs_inc(start, end);
324329
}
325330
}
326331
c.mgr
@@ -370,11 +375,13 @@ impl Client {
370375
);
371376

372377
let payload = if let Some(payload) = &opts.pub_payload {
373-
ntex::util::Bytes::from(payload.clone())
378+
Cow::Borrowed(payload)
374379
} else {
375-
ntex::util::Bytes::from("0".repeat(opts.pub_payload_len))
380+
Cow::Owned("0".repeat(opts.pub_payload_len))
376381
};
377382

383+
let payload = format!("{{ \"data\": \"{}\", \"create_time\": {} }}", payload.as_str(), chrono::Local::now().timestamp_millis());
384+
let payload = ntex::util::Bytes::from(payload);
378385
if sink.is_open() {
379386
let packet_id = c.gen_packet_id();
380387

@@ -428,7 +435,12 @@ impl Client {
428435
.start(
429436
move |control: v3::client::ControlMessage<()>| match control {
430437
v3::client::ControlMessage::Publish(publish) => {
431-
Stats::instance().recvs.inc();
438+
let now = chrono::Local::now().timestamp_millis();
439+
let payload = serde_json::from_slice::<serde_json::Value>(publish.packet().payload.as_ref()).unwrap();
440+
let create_time = payload.as_object().and_then(|obj|{
441+
obj.get("create_time").map(|v|v.as_i64())
442+
}).unwrap().unwrap();
443+
Stats::instance().recvs.inc_limit_cost_times((Stats::instance().conns.value() * 2) as usize, create_time, now);
432444
client
433445
.mgr
434446
.on_message

0 commit comments

Comments
 (0)