refactor(network): simplify local state mini-protocol implementation (#326)

This commit is contained in:
Santiago Carmuega 2023-11-09 21:47:24 -03:00 committed by GitHub
parent aae7d92b44
commit e0f9f14dea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 599 additions and 547 deletions

View file

@ -1,19 +1,30 @@
use pallas::network::{
facades::NodeClient,
miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC},
miniprotocols::{chainsync, localstate::queries_v16, Point, PRE_PRODUCTION_MAGIC},
};
use tracing::info;
async fn do_localstate_query(client: &mut NodeClient) {
client.statequery().acquire(None).await.unwrap();
let client = client.statequery();
let result = client
.statequery()
.query(localstate::queries::Request::GetSystemStart)
client.acquire(None).await.unwrap();
let result = queries_v16::get_chain_point(client).await.unwrap();
info!("result: {:?}", result);
let result = queries_v16::get_system_start(client).await.unwrap();
info!("result: {:?}", result);
let era = queries_v16::get_current_era(client).await.unwrap();
info!("result: {:?}", era);
let result = queries_v16::get_block_epoch_number(client, era)
.await
.unwrap();
info!("system start result: {:?}", result);
info!("result: {:?}", result);
client.send_release().await.unwrap();
}
async fn do_chainsync(client: &mut NodeClient) {
@ -43,6 +54,10 @@ async fn do_chainsync(client: &mut NodeClient) {
}
}
// change the following to match the Cardano node socket in your local
// environment
const SOCKET_PATH: &str = "/tmp/node.socket";
#[cfg(target_family = "unix")]
#[tokio::main]
async fn main() {
@ -55,15 +70,7 @@ async fn main() {
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let socket_path = "/tmp/node.socket";
// we connect to the unix socket of the local node and perform a handshake query
let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC)
.await
.unwrap();
info!("handshake query result: {:?}", version_table);
let mut client = NodeClient::connect(socket_path, MAINNET_MAGIC)
let mut client = NodeClient::connect(SOCKET_PATH, PRE_PRODUCTION_MAGIC)
.await
.unwrap();
@ -75,7 +82,6 @@ async fn main() {
}
#[cfg(not(target_family = "unix"))]
fn main() {
panic!("can't use n2c unix socket on non-unix systems");
}

View file

@ -282,7 +282,7 @@ impl NodeServer {
plexer_handle,
version: ver,
chainsync: server_cs,
statequery: server_sq
statequery: server_sq,
})
} else {
plexer_handle.abort();

View file

@ -1,11 +1,8 @@
use pallas_codec::utils::AnyCbor;
use std::fmt::Debug;
use pallas_codec::Fragment;
use std::marker::PhantomData;
use thiserror::*;
use super::{AcquireFailure, Message, Query, State};
use super::{AcquireFailure, Message, State};
use crate::miniprotocols::Point;
use crate::multiplexer;
@ -13,16 +10,25 @@ use crate::multiplexer;
pub enum ClientError {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("failure acquiring point, not found")]
AcquirePointNotFound,
#[error("failure acquiring point, too old")]
AcquirePointTooOld,
#[error("failure decoding CBOR data")]
InvalidCbor(pallas_codec::minicbor::decode::Error),
#[error("error while sending or receiving data through the channel")]
Plexer(multiplexer::Error),
}
@ -36,22 +42,11 @@ impl From<AcquireFailure> for ClientError {
}
}
pub struct GenericClient<Q>(State, multiplexer::ChannelBuffer, PhantomData<Q>)
where
Q: Query,
Message<Q>: Fragment;
pub struct GenericClient(State, multiplexer::ChannelBuffer);
impl<Q> GenericClient<Q>
where
Q: Query,
Message<Q>: Fragment,
{
impl GenericClient {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Idle,
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
@ -87,7 +82,7 @@ where
}
}
fn assert_outbound_state(&self, msg: &Message<Q>) -> Result<(), ClientError> {
fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Idle, Message::Acquire(_)) => Ok(()),
(State::Idle, Message::Done) => Ok(()),
@ -98,7 +93,7 @@ where
}
}
fn assert_inbound_state(&self, msg: &Message<Q>) -> Result<(), ClientError> {
fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Acquiring, Message::Acquired) => Ok(()),
(State::Acquiring, Message::Failure(_)) => Ok(()),
@ -107,15 +102,18 @@ where
}
}
pub async fn send_message(&mut self, msg: &Message<Q>) -> Result<(), ClientError> {
pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(ClientError::Plexer)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(ClientError::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<Q>, ClientError> {
pub async fn recv_message(&mut self) -> Result<Message, ClientError> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?;
self.assert_inbound_state(&msg)?;
@ -124,7 +122,7 @@ where
}
pub async fn send_acquire(&mut self, point: Option<Point>) -> Result<(), ClientError> {
let msg = Message::<Q>::Acquire(point);
let msg = Message::Acquire(point);
self.send_message(&msg).await?;
self.0 = State::Acquiring;
@ -132,7 +130,7 @@ where
}
pub async fn send_reacquire(&mut self, point: Option<Point>) -> Result<(), ClientError> {
let msg = Message::<Q>::ReAcquire(point);
let msg = Message::ReAcquire(point);
self.send_message(&msg).await?;
self.0 = State::Acquiring;
@ -140,7 +138,7 @@ where
}
pub async fn send_release(&mut self) -> Result<(), ClientError> {
let msg = Message::<Q>::Release;
let msg = Message::Release;
self.send_message(&msg).await?;
self.0 = State::Idle;
@ -148,7 +146,7 @@ where
}
pub async fn send_done(&mut self) -> Result<(), ClientError> {
let msg = Message::<Q>::Done;
let msg = Message::Done;
self.send_message(&msg).await?;
self.0 = State::Done;
@ -174,28 +172,38 @@ where
self.recv_while_acquiring().await
}
pub async fn send_query(&mut self, request: Q::Request) -> Result<(), ClientError> {
let msg = Message::<Q>::Query(request);
pub async fn send_query(&mut self, request: AnyCbor) -> Result<Message, ClientError> {
let msg = Message::Query(request);
self.send_message(&msg).await?;
self.0 = State::Querying;
Ok(())
Ok(msg)
}
pub async fn recv_while_querying(&mut self) -> Result<Q::Response, ClientError> {
pub async fn recv_while_querying(&mut self) -> Result<AnyCbor, ClientError> {
match self.recv_message().await? {
Message::Result(x) => {
Message::Result(result) => {
self.0 = State::Acquired;
Ok(x)
Ok(result)
}
_ => Err(ClientError::InvalidInbound),
}
}
pub async fn query(&mut self, request: Q::Request) -> Result<Q::Response, ClientError> {
pub async fn query_any(&mut self, request: AnyCbor) -> Result<AnyCbor, ClientError> {
self.send_query(request).await?;
self.recv_while_querying().await
}
pub async fn query<Q, R>(&mut self, request: Q) -> Result<R, ClientError>
where
Q: pallas_codec::minicbor::Encode<()>,
for<'b> R: pallas_codec::minicbor::Decode<'b, ()>,
{
let request = AnyCbor::from_encode(request);
let response = self.query_any(request).await?;
response.into_decode().map_err(ClientError::InvalidCbor)
}
}
pub type Client = GenericClient<super::queries::QueryV16>;
pub type Client = GenericClient;

View file

@ -1,6 +1,6 @@
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
use super::{AcquireFailure, Message, Query};
use super::{AcquireFailure, Message};
impl Encode<()> for AcquireFailure {
fn encode<W: encode::Write>(
@ -36,12 +36,7 @@ impl<'b> Decode<'b, ()> for AcquireFailure {
}
}
impl<Q> Encode<()> for Message<Q>
where
Q: Query,
Q::Request: Encode<()>,
Q::Response: Encode<()>,
{
impl Encode<()> for Message {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
@ -97,12 +92,7 @@ where
}
}
impl<'b, Q> Decode<'b, ()> for Message<Q>
where
Q: Query,
Q::Request: Decode<'b, ()>,
Q::Response: Decode<'b, ()>,
{
impl<'b> Decode<'b, ()> for Message {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
_ctx: &mut (),

View file

@ -1,9 +1,10 @@
mod client;
mod codec;
mod protocol;
pub mod queries;
mod server;
pub mod queries_v16;
pub use client::*;
pub use codec::*;
pub use protocol::*;

View file

@ -1,5 +1,7 @@
use std::fmt::Debug;
use pallas_codec::utils::AnyCbor;
use crate::miniprotocols::Point;
#[derive(Debug, PartialEq, Eq, Clone)]
@ -17,18 +19,13 @@ pub enum AcquireFailure {
PointNotOnChain,
}
pub trait Query: Debug {
type Request: Clone + Debug;
type Response: Clone + Debug;
}
#[derive(Debug)]
pub enum Message<Q: Query> {
pub enum Message {
Acquire(Option<Point>),
Failure(AcquireFailure),
Acquired,
Query(Q::Request),
Result(Q::Response),
Query(AnyCbor),
Result(AnyCbor),
ReAcquire(Option<Point>),
Release,
Done,

View file

@ -1,293 +0,0 @@
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use super::Query;
// https://github.com/input-output-hk/ouroboros-consensus/blob/main/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Query.hs
#[derive(Debug, Clone, PartialEq)]
#[repr(u16)]
pub enum BlockQuery {
GetLedgerTip,
GetEpochNo,
// GetNonMyopicMemberRewards(()),
GetCurrentPParams,
GetProposedPParamsUpdates,
GetStakeDistribution,
// GetUTxOByAddress(()),
// GetUTxOWhole, (Response too large for now)
// DebugEpochState, (Response too large for now)
// GetCBOR(()),
// GetFilteredDelegationsAndRewardAccounts(()),
GetGenesisConfig,
// DebugNewEpochState, (Response too large for now)
DebugChainDepState,
GetRewardProvenance,
// GetUTxOByTxIn(()),
GetStakePools,
// GetStakePoolParams(()),
GetRewardInfoPools,
// GetPoolState(()),
// GetStakeSnapshots(()),
// GetPoolDistr(()),
// GetStakeDelegDeposits(()),
// GetConstitutionHash,
}
impl Encode<()> for BlockQuery {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.array(2)?;
e.u16(0)?;
e.array(2)?;
/*
TODO: Think this is era or something? First fetch era with
[3, [0, [2, [1]]]], then use it here?
*/
e.u16(5)?;
match self {
BlockQuery::GetLedgerTip => {
e.array(1)?;
e.u16(0)?;
}
BlockQuery::GetEpochNo => {
e.array(1)?;
e.u16(1)?;
}
// BlockQuery::GetNonMyopicMemberRewards(()) => {
// e.array(X)?;
// e.u16(2)?;
// }
BlockQuery::GetCurrentPParams => {
e.array(1)?;
e.u16(3)?;
}
BlockQuery::GetProposedPParamsUpdates => {
e.array(1)?;
e.u16(4)?;
}
BlockQuery::GetStakeDistribution => {
e.array(1)?;
e.u16(5)?;
}
// BlockQuery::GetUTxOByAddress(()) => {
// e.array(X)?;
// e.u16(6)?;
// }
// BlockQuery::GetUTxOWhole => {
// e.array(1)?;
// e.u16(7)?;
// }
// BlockQuery::DebugEpochState => {
// e.array(1)?;
// e.u16(8)?;
// }
// BlockQuery::GetCBOR(()) => {
// e.array(X)?;
// e.u16(9)?;
// }
// BlockQuery::GetFilteredDelegationsAndRewardAccounts(()) => {
// e.array(X)?;
// e.u16(10)?;
// }
BlockQuery::GetGenesisConfig => {
e.array(1)?;
e.u16(11)?;
}
// BlockQuery::DebugNewEpochState => {
// e.array(1)?;
// e.u16(12)?;
// }
BlockQuery::DebugChainDepState => {
e.array(1)?;
e.u16(13)?;
}
BlockQuery::GetRewardProvenance => {
e.array(1)?;
e.u16(14)?;
}
// BlockQuery::GetUTxOByTxIn(()) => {
// e.array(X)?;
// e.u16(15)?;
// }
BlockQuery::GetStakePools => {
e.array(1)?;
e.u16(16)?;
}
// BlockQuery::GetStakePoolParams(()) => {
// e.array(X)?;
// e.u16(17)?;
// }
BlockQuery::GetRewardInfoPools => {
e.array(1)?;
e.u16(18)?;
}
// BlockQuery::GetPoolState(()) => {
// e.array(X)?;
// e.u16(19)?;
// }
// BlockQuery::GetStakeSnapshots(()) => {
// e.array(X)?;
// e.u16(20)?;
// }
// BlockQuery::GetPoolDistr(()) => {
// e.array(X)?;
// e.u16(21)?;
// }
// BlockQuery::GetStakeDelegDeposits(()) => {
// e.array(X)?;
// e.u16(22)?;
// }
// BlockQuery::GetConstitutionHash => {
// e.array(1)?;
// e.u16(23)?;
// }
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for BlockQuery {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
d.u16()?;
d.array()?;
d.u16()?;
d.array()?;
match d.u16()? {
0 => Ok(Self::GetLedgerTip),
1 => Ok(Self::GetEpochNo),
// 2 => Ok(Self::GetNonMyopicMemberRewards(())),
3 => Ok(Self::GetCurrentPParams),
4 => Ok(Self::GetProposedPParamsUpdates),
5 => Ok(Self::GetStakeDistribution),
// 6 => Ok(Self::GetUTxOByAddress(())),
// 7 => Ok(Self::GetUTxOWhole),
// 8 => Ok(Self::DebugEpochState),
// 9 => Ok(Self::GetCBOR(())),
// 10 => Ok(Self::GetFilteredDelegationsAndRewardAccounts(())),
11 => Ok(Self::GetGenesisConfig),
// 12 => Ok(Self::DebugNewEpochState),
13 => Ok(Self::DebugChainDepState),
14 => Ok(Self::GetRewardProvenance),
// 15 => Ok(Self::GetUTxOByTxIn(())),
16 => Ok(Self::GetStakePools),
// 17 => Ok(Self::GetStakePoolParams(())),
18 => Ok(Self::GetRewardInfoPools),
// 19 => Ok(Self::GetPoolState(())),
// 20 => Ok(Self::GetStakeSnapshots(())),
// 21 => Ok(Self::GetPoolDistr(())),
// 22 => Ok(Self::GetStakeDelegDeposits(())),
// 23 => Ok(Self::GetConstitutionHash),
_ => unreachable!(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Request {
BlockQuery(BlockQuery),
GetSystemStart,
GetChainBlockNo,
GetChainPoint,
}
impl Encode<()> for Request {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Self::BlockQuery(q) => {
e.array(2)?;
e.u16(0)?;
e.encode(q)?;
Ok(())
}
Self::GetSystemStart => {
e.array(1)?;
e.u16(1)?;
Ok(())
}
Self::GetChainBlockNo => {
e.array(1)?;
e.u16(2)?;
Ok(())
}
Self::GetChainPoint => {
e.array(1)?;
e.u16(3)?;
Ok(())
}
}
}
}
impl<'b> Decode<'b, ()> for Request {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
let size = match d.array()? {
Some(l) => l,
None => return Err(decode::Error::message("unexpected indefinite len list")),
};
let tag = d.u16()?;
match (size, tag) {
(2, 0) => Ok(Self::BlockQuery(d.decode()?)),
(1, 1) => Ok(Self::GetSystemStart),
(1, 2) => Ok(Self::GetChainBlockNo),
(1, 3) => Ok(Self::GetChainPoint),
_ => {
return Err(decode::Error::message(
"invalid (size, tag) for lsq request",
))
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct GenericResponse(Vec<u8>);
impl GenericResponse {
/// "bytes" must be valid CBOR
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
}
impl Encode<()> for GenericResponse {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
e.writer_mut()
.write_all(&self.0)
.map_err(|e| encode::Error::write(e))
}
}
impl<'b> Decode<'b, ()> for GenericResponse {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
let start = d.position();
d.skip()?;
let end = d.position();
let slice = &d.input()[start..end];
let vec = slice.to_vec();
Ok(GenericResponse(vec))
}
}
/// Queries available as of N2C V16
#[derive(Debug, Clone)]
pub struct QueryV16 {}
impl Query for QueryV16 {
type Request = Request;
type Response = GenericResponse;
}

View file

@ -0,0 +1,248 @@
use super::*;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
impl Encode<()> for BlockQuery {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
BlockQuery::GetLedgerTip => {
e.array(1)?;
e.u16(0)?;
}
BlockQuery::GetEpochNo => {
e.array(1)?;
e.u16(1)?;
}
BlockQuery::GetNonMyopicMemberRewards(x) => {
e.array(2)?;
e.u16(2)?;
e.encode(x)?;
}
BlockQuery::GetCurrentPParams => {
e.array(1)?;
e.u16(3)?;
}
BlockQuery::GetProposedPParamsUpdates => {
e.array(1)?;
e.u16(4)?;
}
BlockQuery::GetStakeDistribution => {
e.array(1)?;
e.u16(5)?;
}
BlockQuery::GetUTxOByAddress(x) => {
e.array(2)?;
e.u16(6)?;
e.encode(x)?;
}
BlockQuery::GetUTxOWhole => {
e.encode((7,))?;
}
BlockQuery::DebugEpochState => {
e.array(1)?;
e.u16(8)?;
}
BlockQuery::GetCBOR(x) => {
e.array(2)?;
e.u16(9)?;
e.encode(x)?;
}
BlockQuery::GetFilteredDelegationsAndRewardAccounts(x) => {
e.array(2)?;
e.u16(10)?;
e.encode(x)?;
}
BlockQuery::GetGenesisConfig => {
e.array(1)?;
e.u16(11)?;
}
BlockQuery::DebugNewEpochState => {
e.array(1)?;
e.u16(12)?;
}
BlockQuery::DebugChainDepState => {
e.array(1)?;
e.u16(13)?;
}
BlockQuery::GetRewardProvenance => {
e.array(1)?;
e.u16(14)?;
}
BlockQuery::GetUTxOByTxIn(_) => {
e.array(2)?;
e.u16(15)?;
e.encode(2)?;
}
BlockQuery::GetStakePools => {
e.array(1)?;
e.u16(16)?;
}
BlockQuery::GetStakePoolParams(x) => {
e.array(2)?;
e.u16(17)?;
e.encode(x)?;
}
BlockQuery::GetRewardInfoPools => {
e.array(1)?;
e.u16(18)?;
}
BlockQuery::GetPoolState(x) => {
e.array(2)?;
e.u16(19)?;
e.encode(x)?;
}
BlockQuery::GetStakeSnapshots(x) => {
e.array(2)?;
e.u16(20)?;
e.encode(x)?;
}
BlockQuery::GetPoolDistr(x) => {
e.array(2)?;
e.u16(21)?;
e.encode(x)?;
}
BlockQuery::GetStakeDelegDeposits(x) => {
e.array(2)?;
e.u16(22)?;
e.encode(x)?;
}
BlockQuery::GetConstitutionHash => {
e.array(1)?;
e.u16(23)?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for BlockQuery {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
match d.u16()? {
0 => Ok(Self::GetLedgerTip),
1 => Ok(Self::GetEpochNo),
// 2 => Ok(Self::GetNonMyopicMemberRewards(())),
3 => Ok(Self::GetCurrentPParams),
4 => Ok(Self::GetProposedPParamsUpdates),
5 => Ok(Self::GetStakeDistribution),
// 6 => Ok(Self::GetUTxOByAddress(())),
// 7 => Ok(Self::GetUTxOWhole),
// 8 => Ok(Self::DebugEpochState),
// 9 => Ok(Self::GetCBOR(())),
// 10 => Ok(Self::GetFilteredDelegationsAndRewardAccounts(())),
11 => Ok(Self::GetGenesisConfig),
// 12 => Ok(Self::DebugNewEpochState),
13 => Ok(Self::DebugChainDepState),
14 => Ok(Self::GetRewardProvenance),
// 15 => Ok(Self::GetUTxOByTxIn(())),
16 => Ok(Self::GetStakePools),
// 17 => Ok(Self::GetStakePoolParams(())),
18 => Ok(Self::GetRewardInfoPools),
// 19 => Ok(Self::GetPoolState(())),
// 20 => Ok(Self::GetStakeSnapshots(())),
// 21 => Ok(Self::GetPoolDistr(())),
// 22 => Ok(Self::GetStakeDelegDeposits(())),
// 23 => Ok(Self::GetConstitutionHash),
_ => unreachable!(),
}
}
}
impl Encode<()> for HardForkQuery {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
HardForkQuery::GetInterpreter => {
e.encode((0,))?;
}
HardForkQuery::GetCurrentEra => {
e.encode((1,))?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for HardForkQuery {
fn decode(_d: &mut Decoder<'b>, _: &mut ()) -> Result<Self, decode::Error> {
todo!()
}
}
impl Encode<()> for LedgerQuery {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
LedgerQuery::BlockQuery(era, q) => {
e.encode((0, (era, q)))?;
}
LedgerQuery::HardForkQuery(q) => {
e.encode((2, q))?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for LedgerQuery {
fn decode(_d: &mut Decoder<'b>, _: &mut ()) -> Result<Self, decode::Error> {
todo!()
}
}
impl Encode<()> for Request {
fn encode<W: encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
match self {
Self::LedgerQuery(q) => {
e.encode((0, q))?;
Ok(())
}
Self::GetSystemStart => {
e.array(1)?;
e.u16(1)?;
Ok(())
}
Self::GetChainBlockNo => {
e.array(1)?;
e.u16(2)?;
Ok(())
}
Self::GetChainPoint => {
e.array(1)?;
e.u16(3)?;
Ok(())
}
}
}
}
impl<'b> Decode<'b, ()> for Request {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, decode::Error> {
d.array()?;
let tag = d.u16()?;
match tag {
0 => Ok(Self::LedgerQuery(d.decode()?)),
1 => Ok(Self::GetSystemStart),
2 => Ok(Self::GetChainBlockNo),
3 => Ok(Self::GetChainPoint),
_ => Err(decode::Error::message("invalid tag")),
}
}
}

View file

@ -0,0 +1,113 @@
// TODO: this should move to pallas::ledger crate at some point
// required for derive attrs to work
use pallas_codec::minicbor;
use pallas_codec::{
minicbor::{Decode, Encode},
utils::AnyCbor,
};
use crate::miniprotocols::Point;
use super::{Client, ClientError};
mod codec;
// https://github.com/input-output-hk/ouroboros-consensus/blob/main/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Query.hs
#[derive(Debug, Clone, PartialEq)]
#[repr(u16)]
pub enum BlockQuery {
GetLedgerTip,
GetEpochNo,
GetNonMyopicMemberRewards(AnyCbor),
GetCurrentPParams,
GetProposedPParamsUpdates,
GetStakeDistribution,
GetUTxOByAddress(AnyCbor),
GetUTxOWhole,
DebugEpochState,
GetCBOR(AnyCbor),
GetFilteredDelegationsAndRewardAccounts(AnyCbor),
GetGenesisConfig,
DebugNewEpochState,
DebugChainDepState,
GetRewardProvenance,
GetUTxOByTxIn(AnyCbor),
GetStakePools,
GetStakePoolParams(AnyCbor),
GetRewardInfoPools,
GetPoolState(AnyCbor),
GetStakeSnapshots(AnyCbor),
GetPoolDistr(AnyCbor),
GetStakeDelegDeposits(AnyCbor),
GetConstitutionHash,
}
#[derive(Debug, Clone, PartialEq)]
#[repr(u16)]
pub enum HardForkQuery {
GetInterpreter,
GetCurrentEra,
}
pub type Proto = u16;
pub type Era = u16;
#[derive(Debug, Clone, PartialEq)]
pub enum LedgerQuery {
BlockQuery(Era, BlockQuery),
HardForkQuery(HardForkQuery),
}
#[derive(Debug, Clone, PartialEq)]
pub enum Request {
LedgerQuery(LedgerQuery),
GetSystemStart,
GetChainBlockNo,
GetChainPoint,
}
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct SystemStart {
#[n(0)]
pub year: u32,
#[n(1)]
pub day_of_year: u32,
#[n(2)]
pub picoseconds_of_day: u64,
}
pub async fn get_chain_point(client: &mut Client) -> Result<Point, ClientError> {
let query = Request::GetChainPoint;
let result = client.query(query).await?;
Ok(result)
}
pub async fn get_current_era(client: &mut Client) -> Result<Era, ClientError> {
let query = HardForkQuery::GetCurrentEra;
let query = LedgerQuery::HardForkQuery(query);
let query = Request::LedgerQuery(query);
let result = client.query(query).await?;
Ok(result)
}
pub async fn get_system_start(client: &mut Client) -> Result<SystemStart, ClientError> {
let query = Request::GetSystemStart;
let result = client.query(query).await?;
Ok(result)
}
pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result<u32, ClientError> {
let query = BlockQuery::GetEpochNo;
let query = LedgerQuery::BlockQuery(era, query);
let query = Request::LedgerQuery(query);
let (result,): (_,) = client.query(query).await?;
Ok(result)
}

View file

@ -1,11 +1,8 @@
use pallas_codec::utils::AnyCbor;
use std::fmt::Debug;
use pallas_codec::Fragment;
use std::marker::PhantomData;
use thiserror::*;
use super::{AcquireFailure, Message, Query, State};
use super::{AcquireFailure, Message, State};
use crate::miniprotocols::Point;
use crate::multiplexer;
@ -28,28 +25,17 @@ pub struct ClientAcquireRequest(pub Option<Point>);
/// Request received from the client when in the Acquired state
#[derive(Debug)]
pub enum ClientQueryRequest<Q: Query> {
pub enum ClientQueryRequest {
ReAcquire(Option<Point>),
Query(Q::Request),
Query(AnyCbor),
Release,
}
pub struct GenericServer<Q>(State, multiplexer::ChannelBuffer, PhantomData<Q>)
where
Q: Query,
Message<Q>: Fragment;
pub struct GenericServer(State, multiplexer::ChannelBuffer);
impl<Q> GenericServer<Q>
where
Q: Query,
Message<Q>: Fragment,
{
impl GenericServer {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Idle,
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
@ -84,7 +70,7 @@ where
}
}
fn assert_outbound_state(&self, msg: &Message<Q>) -> Result<(), Error> {
fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Acquiring, Message::Acquired) => Ok(()),
(State::Acquiring, Message::Failure(_)) => Ok(()),
@ -93,7 +79,7 @@ where
}
}
fn assert_inbound_state(&self, msg: &Message<Q>) -> Result<(), Error> {
fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::Acquire(_)) => Ok(()),
(State::Idle, Message::Done) => Ok(()),
@ -104,7 +90,7 @@ where
}
}
pub async fn send_message(&mut self, msg: &Message<Q>) -> Result<(), Error> {
pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
@ -112,7 +98,7 @@ where
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message<Q>, Error> {
pub async fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
@ -121,7 +107,7 @@ where
}
pub async fn send_failure(&mut self, reason: AcquireFailure) -> Result<(), Error> {
let msg = Message::<Q>::Failure(reason);
let msg = Message::Failure(reason);
self.send_message(&msg).await?;
self.0 = State::Idle;
@ -129,15 +115,15 @@ where
}
pub async fn send_acquired(&mut self) -> Result<(), Error> {
let msg = Message::<Q>::Acquired;
let msg = Message::Acquired;
self.send_message(&msg).await?;
self.0 = State::Acquired;
Ok(())
}
pub async fn send_result(&mut self, response: Q::Response) -> Result<(), Error> {
let msg = Message::<Q>::Result(response);
pub async fn send_result(&mut self, response: AnyCbor) -> Result<(), Error> {
let msg = Message::Result(response);
self.send_message(&msg).await?;
self.0 = State::Acquired;
@ -162,7 +148,7 @@ where
}
}
pub async fn recv_while_acquired(&mut self) -> Result<ClientQueryRequest<Q>, Error> {
pub async fn recv_while_acquired(&mut self) -> Result<ClientQueryRequest, Error> {
match self.recv_message().await? {
Message::ReAcquire(point) => {
self.0 = State::Acquiring;
@ -181,4 +167,4 @@ where
}
}
pub type Server = GenericServer<super::queries::QueryV16>;
pub type Server = GenericServer;

View file

@ -2,19 +2,21 @@ use std::fs;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use pallas_codec::utils::AnyCbor;
use pallas_network::facades::{NodeClient, PeerClient, PeerServer};
use pallas_network::miniprotocols::blockfetch::BlockRequest;
use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip};
use pallas_network::miniprotocols::handshake::n2c;
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::localstate::queries::{GenericResponse, Request};
use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest};
use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip};
use pallas_network::miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
Point,
};
use pallas_network::miniprotocols::{handshake, localstate};
use pallas_network::miniprotocols::{
handshake, localstate, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
};
use pallas_network::multiplexer::{Bearer, Plexer};
use std::path::Path;
use tokio::net::{TcpListener, UnixListener};
@ -255,150 +257,6 @@ pub async fn blockfetch_server_and_client_happy_path() {
_ = tokio::join!(client, server);
}
#[tokio::test]
#[ignore]
pub async fn local_state_query_server_and_client_happy_path() {
let server = tokio::spawn({
async move {
// server setup
let socket_path = Path::new("node.socket");
if socket_path.exists() {
fs::remove_file(&socket_path).unwrap();
}
let unix_listener = UnixListener::bind(socket_path).unwrap();
let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap();
let mut server_plexer = Plexer::new(bearer);
let mut server_hs: handshake::Server<n2c::VersionData> =
handshake::Server::new(server_plexer.subscribe_server(0));
let mut server_sq: localstate::Server =
localstate::Server::new(server_plexer.subscribe_server(7));
tokio::spawn(async move { server_plexer.run().await });
server_hs.receive_proposed_versions().await.unwrap();
server_hs
.accept_version(10, n2c::VersionData::new(0, Some(false)))
.await
.unwrap();
// server receives range from client, sends blocks
let ClientAcquireRequest(maybe_point) =
server_sq.recv_while_idle().await.unwrap().unwrap();
assert_eq!(maybe_point, Some(Point::Origin));
assert_eq!(*server_sq.state(), localstate::State::Acquiring);
// server_bf.send_block_range(bodies).await.unwrap();
server_sq.send_acquired().await.unwrap();
assert_eq!(*server_sq.state(), localstate::State::Acquired);
// server receives query from client
let query = match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::Query(q) => q,
x => panic!("unexpected message from client: {x:?}"),
};
assert_eq!(
query,
Request::BlockQuery(localstate::queries::BlockQuery::GetStakePools)
);
assert_eq!(*server_sq.state(), localstate::State::Querying);
server_sq
.send_result(GenericResponse::new(hex::decode("82011A008BD423").unwrap()))
.await
.unwrap();
assert_eq!(*server_sq.state(), localstate::State::Acquired);
// server receives reaquire from the client
let maybe_point = match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::ReAcquire(p) => p,
x => panic!("unexpected message from client: {x:?}"),
};
assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3])));
assert_eq!(*server_sq.state(), localstate::State::Acquiring);
server_sq.send_acquired().await.unwrap();
// server receives release from the client
match server_sq.recv_while_acquired().await.unwrap() {
ClientQueryRequest::Release => (),
x => panic!("unexpected message from client: {x:?}"),
};
assert!(server_sq.recv_while_idle().await.unwrap().is_none());
assert_eq!(*server_sq.state(), localstate::State::Done);
}
});
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
// client setup
let socket_path = "node.socket";
let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap();
let client_sq = client_to_server_conn.statequery();
// client sends acquire
client_sq.send_acquire(Some(Point::Origin)).await.unwrap();
client_sq.recv_while_acquiring().await.unwrap();
assert_eq!(*client_sq.state(), localstate::State::Acquired);
// client sends a BlockQuery
client_sq
.send_query(Request::BlockQuery(
localstate::queries::BlockQuery::GetStakePools,
))
.await
.unwrap();
let resp = client_sq.recv_while_querying().await.unwrap();
assert_eq!(
resp,
GenericResponse::new(hex::decode("82011A008BD423").unwrap())
);
// client sends a ReAquire
client_sq
.send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3])))
.await
.unwrap();
client_sq.recv_while_acquiring().await.unwrap();
client_sq.send_release().await.unwrap();
client_sq.send_done().await.unwrap();
});
_ = tokio::join!(client, server);
}
#[tokio::test]
#[ignore]
pub async fn chainsync_server_and_client_happy_path_n2n() {
@ -596,4 +454,142 @@ pub async fn chainsync_server_and_client_happy_path_n2n() {
});
_ = tokio::join!(client, server);
}
}
#[tokio::test]
#[ignore]
pub async fn local_state_query_server_and_client_happy_path() {
let server = tokio::spawn({
async move {
// server setup
let socket_path = Path::new("node.socket");
if socket_path.exists() {
fs::remove_file(&socket_path).unwrap();
}
let unix_listener = UnixListener::bind(socket_path).unwrap();
let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 0)
.await
.unwrap();
// wait for acquire request from client
let maybe_acquire = server.statequery().recv_while_idle().await.unwrap();
assert!(maybe_acquire.is_some());
assert_eq!(*server.statequery().state(), localstate::State::Acquiring);
server.statequery().send_acquired().await.unwrap();
assert_eq!(*server.statequery().state(), localstate::State::Acquired);
// server receives query from client
let query: localstate::queries_v16::Request =
match server.statequery().recv_while_acquired().await.unwrap() {
ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
x => panic!("unexpected message from client: {x:?}"),
};
assert_eq!(query, localstate::queries_v16::Request::GetSystemStart);
assert_eq!(*server.statequery().state(), localstate::State::Querying);
let result = AnyCbor::from_encode(localstate::queries_v16::SystemStart {
year: 2020,
day_of_year: 1,
picoseconds_of_day: 999999999,
});
server.statequery().send_result(result).await.unwrap();
assert_eq!(*server.statequery().state(), localstate::State::Acquired);
// server receives re-acquire from the client
let maybe_point = match server.statequery().recv_while_acquired().await.unwrap() {
ClientQueryRequest::ReAcquire(p) => p,
x => panic!("unexpected message from client: {x:?}"),
};
assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3])));
assert_eq!(*server.statequery().state(), localstate::State::Acquiring);
server.statequery().send_acquired().await.unwrap();
// server receives release from the client
match server.statequery().recv_while_acquired().await.unwrap() {
ClientQueryRequest::Release => (),
x => panic!("unexpected message from client: {x:?}"),
};
let next_request = server.statequery().recv_while_idle().await.unwrap();
assert!(next_request.is_none());
assert_eq!(*server.statequery().state(), localstate::State::Done);
}
});
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
// client setup
let socket_path = "node.socket";
let mut client = NodeClient::connect(socket_path, 0).await.unwrap();
// client sends acquire
client
.statequery()
.send_acquire(Some(Point::Origin))
.await
.unwrap();
client.statequery().recv_while_acquiring().await.unwrap();
assert_eq!(*client.statequery().state(), localstate::State::Acquired);
// client sends a BlockQuery
let request = AnyCbor::from_encode(localstate::queries_v16::Request::GetSystemStart);
client.statequery().send_query(request).await.unwrap();
let result: localstate::queries_v16::SystemStart = client
.statequery()
.recv_while_querying()
.await
.unwrap()
.into_decode()
.unwrap();
assert_eq!(
result,
localstate::queries_v16::SystemStart {
year: 2020,
day_of_year: 1,
picoseconds_of_day: 999999999,
}
);
// client sends a ReAquire
client
.statequery()
.send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3])))
.await
.unwrap();
client.statequery().recv_while_acquiring().await.unwrap();
client.statequery().send_release().await.unwrap();
client.statequery().send_done().await.unwrap();
});
_ = tokio::join!(client, server);
}