Unable to fetch a value from Kadmelia #3058
-
I'm having trouble fetching a value from Kademlia using the js-libp2p, however, my example works in Rust. For my example (for now) the Rust code launches two Kad servers, the first does not execute bootstrap (there's nothing to connect to), the second one connects to the first. I'm using a Quorum of 1 for testing purposes, I can't (or don't have the patience to) launch N>3 nodes! Details
use std::time::Duration;
use clap::Parser;
use libp2p::{
core,
futures::StreamExt,
identify,
identity::{self, Keypair},
kad::{self, InboundRequest, QueryResult, Record},
noise, ping,
swarm::{self, NetworkBehaviour, SwarmEvent},
tcp, websocket, yamux, Multiaddr, Swarm, Transport,
};
use lp2p::extract_peer_id;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
#[derive(Clone, Debug, clap::Parser)]
struct App {
#[arg(short='l', value_delimiter=',', num_args=1..)]
listen_addrs: Vec<Multiaddr>,
#[arg(short='b', value_delimiter=',', num_args=1..)]
bootnodes: Vec<Multiaddr>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(
EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env()
.unwrap(),
)
.init();
let app = App::parse();
let mut swarm = create_swarm(app.bootnodes);
for addr in app.listen_addrs {
swarm.listen_on(addr).unwrap();
}
loop {
tokio::select! {
event = swarm.select_next_some() => on_swarm_event(&mut swarm, event)
}
}
}
#[derive(NetworkBehaviour)]
struct Behaviour {
identify: identify::Behaviour,
kad: kad::Behaviour<kad::store::MemoryStore>,
}
impl Behaviour {
fn new(keypair: Keypair, bootnodes: Vec<Multiaddr>) -> Self {
let identify = identify::Behaviour::new(identify::Config::new(
"/polka-test/identify/1.0.0".to_string(),
keypair.public(),
));
let local_peer_id = keypair.public().to_peer_id();
let mut kad =
kad::Behaviour::new(local_peer_id, kad::store::MemoryStore::new(local_peer_id));
kad.set_mode(Some(kad::Mode::Server));
for node in bootnodes {
tracing::info!("Adding address to Kademlia: {node}");
kad.add_address(&extract_peer_id(&node).unwrap(), node);
}
Self { identify, kad }
}
}
fn create_swarm(bootnodes: Vec<Multiaddr>) -> Swarm<Behaviour> {
let identity = identity::Keypair::generate_ed25519();
let local_peer_id = identity.public().to_peer_id();
tracing::info!("Local peer id: {local_peer_id}");
let noise_config = noise::Config::new(&identity).unwrap(); // TODO: proper error handling
let muxer_config = yamux::Config::default();
let tcp_config = tcp::Config::new();
let tcp_transport = tcp::tokio::Transport::new(tcp_config.clone());
let ws = websocket::WsConfig::new(tcp::tokio::Transport::new(tcp_config));
let tcp_ws_transport = tcp_transport
.or_transport(ws)
.upgrade(core::upgrade::Version::V1Lazy)
.authenticate(noise_config)
.multiplex(muxer_config)
.boxed();
let local_peer_id = identity.public().to_peer_id();
Swarm::new(
tcp_ws_transport,
Behaviour::new(identity, bootnodes),
local_peer_id,
swarm::Config::with_tokio_executor().with_idle_connection_timeout(Duration::from_secs(10)),
)
}
fn on_swarm_event(swarm: &mut Swarm<Behaviour>, event: SwarmEvent<BehaviourEvent>) {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
tracing::debug!("New listen address: {address}");
}
SwarmEvent::ExternalAddrConfirmed { address } => {
tracing::debug!("Local external address confirmed: {address}")
}
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
tracing::debug!("External address confirmed: {address} for {peer_id}")
}
SwarmEvent::Behaviour(event) => on_behaviour_event(swarm, event),
_ => tracing::debug!("Received unhandled event: {event:?}"),
}
}
fn on_behaviour_event(swarm: &mut Swarm<Behaviour>, event: BehaviourEvent) {
match event {
BehaviourEvent::Identify(event) => {
match event {
identify::Event::Received { peer_id, info, .. } => {
tracing::info!("Received identify event with info: {info:?}");
if info.listen_addrs.is_empty() {
tracing::warn!("No listen addresses for peer {}, skipping...", peer_id);
return;
}
let is_kad_capable = info
.protocols
.iter()
.any(|stream_protocol| kad::PROTOCOL_NAME.eq(stream_protocol));
if is_kad_capable {
for addr in info.listen_addrs.clone() {
tracing::info!("Adding address to Kademlia: {addr}");
swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
} else {
tracing::warn!("No {} protocol found, skipping...", kad::PROTOCOL_NAME);
return;
}
tracing::info!("Putting listen addresses for peer: {}", peer_id);
let buffer: Vec<u8> = vec![];
let bytes = cbor4ii::serde::to_vec(buffer, &info.listen_addrs).unwrap();
let record = Record::new(peer_id.as_ref().to_owned(), bytes);
swarm
.behaviour_mut()
.kad
.put_record(record, kad::Quorum::One)
.unwrap();
}
_ => tracing::debug!("Received unhandled identify event: {event:?}"),
};
}
BehaviourEvent::Kad(event) => match event {
kad::Event::OutboundQueryProgressed { result, .. } => on_query_result(result),
kad::Event::InboundRequest { request } => on_inbound_request(request),
_ => tracing::debug!("Received unhandled kadmelia event: {event:?}"),
},
_ => tracing::debug!("Received unhandled behaviour event: {event:?}"),
}
}
fn on_query_result(result: QueryResult) {
match result {
kad::QueryResult::GetRecord(get_record_ok) => match get_record_ok {
Ok(ok) => tracing::info!("Successful GetRecord: {ok:?}"),
Err(err) => tracing::error!("Failed GetRecord: {err:?}"),
},
kad::QueryResult::PutRecord(put_record_ok) => match put_record_ok {
Ok(ok) => tracing::info!("Successful PutRecord: {ok:?}"),
Err(err) => tracing::error!("Failed PutRecord: {err:?}"),
},
_ => tracing::debug!("Received unhandled QueryResult: {result:?}"),
}
}
fn on_inbound_request(request: InboundRequest) {
match request {
request @ kad::InboundRequest::GetRecord { .. } => {
tracing::info!("Received GetRecord request: {request:?}")
}
request @ kad::InboundRequest::PutRecord { .. } => {
tracing::info!("Received PutRecord request: {request:?}")
}
_ => tracing::debug!("Received unhandled InboundRequest: {request:?}"),
}
} On the JS side, I'm currently running a simple example that sets up a libp2p node and should connect to the Kad DHT the Rust nodes provide. I'm running it in client mode as it isn't supposed to be a server, it really is just supposed to connect ephemerally to the DHT to fetch a value. import { noise } from "@chainsafe/libp2p-noise"
import { yamux } from "@chainsafe/libp2p-yamux"
import { bootstrap } from "@libp2p/bootstrap"
import { identify, identifyPush } from "@libp2p/identify"
import { kadDHT } from "@libp2p/kad-dht"
import { peerIdFromString } from "@libp2p/peer-id"
import { webSockets } from "@libp2p/websockets"
import { createLibp2p } from "libp2p"
async function createNode(bootnodes: string[]) {
return await createLibp2p(
{
addresses: {
listen: [
"/ip4/0.0.0.0/tcp/0/ws"
]
},
transports: [webSockets()],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [
bootstrap({
list: bootnodes,
}),
],
services: {
identify: identify(),
identifyPush: identifyPush(),
dht: kadDHT({
clientMode: true,
})
},
connectionMonitor: {
enabled: false
},
},
)
}
async function main() {
const [_node, _script, bootnodes, query] = process.argv
if (!bootnodes) {
throw new Error("Missing \"bootnodes\"")
}
if (!query) {
throw new Error("Missing \"query\"")
}
const node = await createNode(bootnodes.split(","))
const peerId = peerIdFromString(query)
console.log(await node.contentRouting.get(peerId.toMultihash().bytes))
}
main() This code successfully connects to the Rust server and runs an One thing I'm observing are the following logs:
Though I've configured the bootstrap peers... I've searched far and wide for proper documentation for what should be a simple use case and can't find any 😭, as such I am resorting to asking for help here. The full code for the repo is in https://github.com/jmg-duarte/libp2p-rs-js-dht, Thanks in advance! 🙏 |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
I've done further work on this, still to no avail. I found that the addresses were being removed under my nose because they were all private (I didn't find a single doc on this, how was I supposed to know?), I replaced the filter so I could try to test this locally. I added AutoNAT too to double ensure that the exposed addresses would be correct. I also found that the query fails locally (here) (shocker, it's a client after all) and never attempts to do remote one... My honest next step is moving to Rust and use WASM instead. cc @achingbrain, maybe you'll be able to help |
Beta Was this translation helpful? Give feedback.
-
Spent a few hours looking at this, think I've found the issue. When the JS peer starts, it first needs to populate its DHT routing table. However before adding an entry to its table, it first does a DHT ping which is unsupported in rust https://github.com/libp2p/rust-libp2p/blob/master/protocols/kad/src/handler.rs#L884 As the JS peers routing table is empty when bootstrapping from only rust peers, it never gets to a state where it can query the network. You can also test using vole
Hope that helps |
Beta Was this translation helpful? Give feedback.
-
@jmg-duarte Added an issue to track it - #3072 |
Beta Was this translation helpful? Give feedback.
-
Amazing stuff, thanks for the investigation @dozyio! @jmg-duarte please can you try again with |
Beta Was this translation helpful? Give feedback.
Amazing stuff, thanks for the investigation @dozyio!
@jmg-duarte please can you try again with
@libp2p/[email protected]
- you'll also need to add@libp2p/ping
to your service map.