feat(network): implement stake snapshot local state query (#394)

This commit is contained in:
Alexsander Falcucci 2024-02-06 13:17:44 +01:00 committed by GitHub
parent 3a0514c92b
commit 43cdb74ee0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 165 additions and 6 deletions

View file

@ -54,6 +54,14 @@ async fn do_localstate_query(client: &mut NodeClient) {
let result = queries_v16::get_current_pparams(client, era).await.unwrap();
println!("result: {:?}", result);
// Stake pool ID/verification key hash (either Bech32-decoded or hex-decoded).
// Empty Vec means all pools.
let pools = vec![];
let result = queries_v16::get_stake_snapshots(client, era, pools)
.await
.unwrap();
println!("result: {:?}", result);
client.send_release().await.unwrap();
}

View file

@ -94,10 +94,16 @@ impl Encode<()> for BlockQuery {
e.u16(19)?;
e.encode(x)?;
}
BlockQuery::GetStakeSnapshots(x) => {
BlockQuery::GetStakeSnapshots(pools) => {
e.array(2)?;
e.u16(20)?;
e.encode(x)?;
if !pools.is_empty() {
e.array(Vec::len(pools) as u64)?;
e.tag(Tag::Unassigned(258))?;
}
e.encode(pools)?;
}
BlockQuery::GetPoolDistr(x) => {
e.array(2)?;
@ -143,7 +149,7 @@ impl<'b> Decode<'b, ()> for BlockQuery {
// 17 => Ok(Self::GetStakePoolParams(())),
18 => Ok(Self::GetRewardInfoPools),
// 19 => Ok(Self::GetPoolState(())),
// 20 => Ok(Self::GetStakeSnapshots(())),
20 => Ok(Self::GetStakeSnapshots(d.decode()?)),
// 21 => Ok(Self::GetPoolDistr(())),
// 22 => Ok(Self::GetStakeDelegDeposits(())),
// 23 => Ok(Self::GetConstitutionHash),

View file

@ -41,7 +41,7 @@ pub enum BlockQuery {
GetStakePoolParams(AnyCbor),
GetRewardInfoPools,
GetPoolState(AnyCbor),
GetStakeSnapshots(AnyCbor),
GetStakeSnapshots(Pools),
GetPoolDistr(AnyCbor),
GetStakeDelegDeposits(AnyCbor),
GetConstitutionHash,
@ -210,6 +210,8 @@ pub type Addr = Bytes;
pub type Addrs = Vec<Addr>;
pub type Pools = Vec<Option<Bytes>>;
pub type Coin = AnyUInt;
pub type PolicyId = Hash<28>;
@ -270,6 +272,40 @@ pub struct UTxO {
pub index: AnyUInt,
}
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct StakeSnapshot {
#[n(0)]
pub snapshots: Snapshots,
}
#[derive(Debug, Encode, Decode, PartialEq, Clone)]
pub struct Snapshots {
#[n(0)]
pub stake_snapshots: KeyValuePairs<Bytes, Stakes>,
#[n(1)]
pub snapshot_stake_mark_total: u64,
#[n(2)]
pub snapshot_stake_set_total: u64,
#[n(3)]
pub snapshot_stake_go_total: u64,
}
#[derive(Debug, Encode, Decode, PartialEq, Clone)]
pub struct Stakes {
#[n(0)]
pub snapshot_mark_pool: u64,
#[n(1)]
pub snapshot_set_pool: u64,
#[n(2)]
pub snapshot_go_pool: u64,
}
/// Get the current tip of the ledger.
pub async fn get_chain_point(client: &mut Client) -> Result<Point, ClientError> {
let query = Request::GetChainPoint;
let result = client.query(query).await?;
@ -277,6 +313,7 @@ pub async fn get_chain_point(client: &mut Client) -> Result<Point, ClientError>
Ok(result)
}
/// Get the current era.
pub async fn get_current_era(client: &mut Client) -> Result<Era, ClientError> {
let query = HardForkQuery::GetCurrentEra;
let query = LedgerQuery::HardForkQuery(query);
@ -286,6 +323,7 @@ pub async fn get_current_era(client: &mut Client) -> Result<Era, ClientError> {
Ok(result)
}
/// Get the system start time.
pub async fn get_system_start(client: &mut Client) -> Result<SystemStart, ClientError> {
let query = Request::GetSystemStart;
let result = client.query(query).await?;
@ -293,6 +331,7 @@ pub async fn get_system_start(client: &mut Client) -> Result<SystemStart, Client
Ok(result)
}
/// Get the current protocol parameters.
pub async fn get_current_pparams(
client: &mut Client,
era: u16,
@ -305,6 +344,7 @@ pub async fn get_current_pparams(
Ok(result)
}
/// Get the block number for the current tip.
pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result<u32, ClientError> {
let query = BlockQuery::GetEpochNo;
let query = LedgerQuery::BlockQuery(era, query);
@ -314,6 +354,7 @@ pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result<u32
Ok(result)
}
/// Get the current stake distribution for the given era.
pub async fn get_stake_distribution(
client: &mut Client,
era: u16,
@ -326,6 +367,7 @@ pub async fn get_stake_distribution(
Ok(result)
}
/// Get the UTxO set for the given era.
pub async fn get_utxo_by_address(
client: &mut Client,
era: u16,
@ -338,3 +380,20 @@ pub async fn get_utxo_by_address(
Ok(result)
}
/// Get stake snapshots for the given era and stake pools.
/// If `pools` are empty, all pools are queried.
/// Otherwise, only the specified pool is queried.
/// Note: This Query is limited by 1 pool per request.
pub async fn get_stake_snapshots(
client: &mut Client,
era: u16,
pools: Vec<Option<Bytes>>,
) -> Result<StakeSnapshot, ClientError> {
let query = BlockQuery::GetStakeSnapshots(pools);
let query = LedgerQuery::BlockQuery(era, query);
let query = Request::LedgerQuery(query);
let result = client.query(query).await?;
Ok(result)
}

View file

@ -2,13 +2,15 @@ use std::fs;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
use pallas_codec::utils::{AnyCbor, AnyUInt, Bytes, KeyValuePairs, TagWrap};
use pallas_crypto::hash::Hash;
use pallas_network::facades::{NodeClient, PeerClient, PeerServer};
use pallas_network::miniprotocols::blockfetch::BlockRequest;
use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip};
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::localstate::queries_v16::{Addr, Addrs, UnitInterval, Value};
use pallas_network::miniprotocols::localstate::queries_v16::{
Addr, Addrs, Snapshots, Stakes, UnitInterval, Value,
};
use pallas_network::miniprotocols::localstate::ClientQueryRequest;
use pallas_network::miniprotocols::txsubmission::{EraTxBody, TxIdAndSize};
use pallas_network::miniprotocols::{
@ -658,6 +660,50 @@ pub async fn local_state_query_server_and_client_happy_path() {
server.statequery().send_result(result).await.unwrap();
// server receives query from client
let query: localstate::queries_v16::Request =
match server.statequery().recv_while_acquired().await.unwrap() {
ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
x => panic!("unexpected message from client: {x:?}"),
};
assert_eq!(
query,
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
5,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(vec![]),
),
)
);
assert_eq!(*server.statequery().state(), localstate::State::Querying);
let pool_id: Bytes =
hex::decode("fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b")
.unwrap()
.into();
let stake_snapshots = KeyValuePairs::from(vec![(
pool_id,
Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 0,
snapshot_go_pool: 0,
},
)]);
let snapshots = Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 0,
snapshot_stake_set_total: 0,
snapshot_stake_go_total: 0,
};
let result = AnyCbor::from_encode(localstate::queries_v16::StakeSnapshot { snapshots });
server.statequery().send_result(result).await.unwrap();
assert_eq!(*server.statequery().state(), localstate::State::Acquired);
// server receives re-acquire from the client
@ -874,6 +920,46 @@ pub async fn local_state_query_server_and_client_happy_path() {
}]
);
let request = AnyCbor::from_encode(localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
5,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(vec![]),
),
));
client.statequery().send_query(request).await.unwrap();
let result: localstate::queries_v16::StakeSnapshot = client
.statequery()
.recv_while_querying()
.await
.unwrap()
.into_decode()
.unwrap();
let pool_id: Bytes =
hex::decode("fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b")
.unwrap()
.into();
let stake_snapshots = KeyValuePairs::from(vec![(
pool_id,
Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 0,
snapshot_go_pool: 0,
},
)]);
let snapshots = Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 0,
snapshot_stake_set_total: 0,
snapshot_stake_go_total: 0,
};
assert_eq!(result, localstate::queries_v16::StakeSnapshot { snapshots });
// client sends a ReAquire
client
.statequery()