Skip to content

Commit

Permalink
Add Heal command (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
pietrodimarco-dfinity authored Jun 18, 2024
1 parent 9baa876 commit 099fc6d
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 6 deletions.
9 changes: 9 additions & 0 deletions rs/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ pub enum Commands {
/// Path to the DER file
path: String,
},

Heal {
/// Max number of nodes to be replaced per subnet.
/// Optimization will be performed automatically maximizing the decentralization
/// and minimizing the number of replaced nodes per subnet
#[clap(short, long)]
max_replaceable_nodes_per_sub: Option<usize>,
},

/// Manage an existing subnet
Subnet(subnet::Cmd),
/// Get a value using ic-admin CLI
Expand Down
11 changes: 10 additions & 1 deletion rs/cli/src/clients.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use async_trait::async_trait;
use decentralization::HealResponse;
use decentralization::SubnetChangeResponse;
use ic_base_types::PrincipalId;
use ic_management_types::{
requests::{MembershipReplaceRequest, NodesRemoveRequest, NodesRemoveResponse, SubnetCreateRequest, SubnetResizeRequest},
requests::{HealRequest, MembershipReplaceRequest, NodesRemoveRequest, NodesRemoveResponse, SubnetCreateRequest, SubnetResizeRequest},
Artifact, Network, NetworkError, Release, TopologyChangeProposal,
};
use log::error;
Expand Down Expand Up @@ -97,6 +98,14 @@ impl DashboardBackendClient {
.rest_send()
.await
}

pub(crate) async fn network_heal(&self, request: HealRequest) -> anyhow::Result<HealResponse> {
reqwest::Client::new()
.post(self.url.join("network/heal").map_err(|e| anyhow::anyhow!(e))?)
.json(&request)
.rest_send()
.await
}
}

#[async_trait]
Expand Down
14 changes: 14 additions & 0 deletions rs/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ async fn async_main() -> Result<(), anyhow::Error> {
Ok(())
}

cli::Commands::Heal {
max_replaceable_nodes_per_sub,
} => {
runner_instance
.network_heal(
ic_management_types::requests::HealRequest {
max_replaceable_nodes_per_sub: *max_replaceable_nodes_per_sub,
},
cli_opts.verbose,
simulate,
)
.await
}

cli::Commands::Subnet(subnet) => {
// Check if required arguments are provided
match &subnet.subcommand {
Expand Down
34 changes: 34 additions & 0 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::operations::hostos_rollout::{HostosRollout, HostosRolloutResponse, No
use crate::ops_subnet_node_replace;
use crate::{ic_admin, local_unused_port};
use decentralization::SubnetChangeResponse;
use futures::future::join_all;
use ic_base_types::PrincipalId;
use ic_management_backend::proposal::ProposalAgent;
use ic_management_backend::public_dashboard::query_ic_dashboard_list;
Expand Down Expand Up @@ -444,4 +445,37 @@ impl Runner {
.await?;
Ok(())
}

pub async fn network_heal(
&self,
request: ic_management_types::requests::HealRequest,
_verbose: bool,
simulate: bool,
) -> Result<(), anyhow::Error> {
let change = self.dashboard_backend_client.network_heal(request).await?;
println!("{}", change);

let errors = join_all(change.subnets_change_response.iter().map(|subnet_change_response| async move {
self.run_membership_change(
subnet_change_response.clone(),
ops_subnet_node_replace::replace_proposal_options(subnet_change_response)?,
simulate,
)
.await
.map_err(|e| {
println!("{}", e);
e
})
}))
.await
.into_iter()
.filter_map(|f| f.err())
.collect::<Vec<_>>();

if !errors.is_empty() {
anyhow::bail!("Errors: {:?}", errors);
}

Ok(())
}
}
16 changes: 15 additions & 1 deletion rs/decentralization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl From<&network::SubnetChange> for SubnetChangeResponse {

impl Display for SubnetChangeResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Decentralization score changes:\n")?;
writeln!(f, "Decentralization score changes for subnet {}:\n", self.subnet_id.unwrap_or_default())?;
let before_individual = self.score_before.scores_individual();
let after_individual = self.score_after.scores_individual();
self.score_before
Expand Down Expand Up @@ -183,3 +183,17 @@ impl Display for SubnetChangeResponse {
Ok(())
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct HealResponse {
pub subnets_change_response: Vec<SubnetChangeResponse>,
}

impl Display for HealResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
for change in &self.subnets_change_response {
writeln!(f, "{}", change)?;
}
Ok(())
}
}
81 changes: 80 additions & 1 deletion rs/decentralization/src/nakamoto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl Display for NakamotoScore {
mod tests {
use std::str::FromStr;

use crate::network::{DecentralizedSubnet, SubnetChangeRequest};
use crate::network::{DecentralizedSubnet, NetworkHealRequest, NetworkHealSubnets, SubnetChangeRequest};
use ic_base_types::PrincipalId;
use itertools::Itertools;
use regex::Regex;
Expand Down Expand Up @@ -871,4 +871,83 @@ mod tests {
(5000, vec!["European subnet has 5 non-European node(s)".to_string()])
);
}

#[test]
fn test_network_heal_subnets_ord() {
let not_important_small = new_test_subnet(0, 13, 0)
.with_subnet_id(PrincipalId::from_str("k44fs-gm4pv-afozh-rs7zw-cg32n-u7xov-xqyx3-2pw5q-eucnu-cosd4-uqe").unwrap());
let not_important_small = NetworkHealSubnets {
name: String::from("App 20"),
decentralized_subnet: not_important_small,
unhealthy_nodes: vec![],
};

let not_important_large = new_test_subnet(0, 28, 0)
.with_subnet_id(PrincipalId::from_str("bkfrj-6k62g-dycql-7h53p-atvkj-zg4to-gaogh-netha-ptybj-ntsgw-rqe").unwrap());
let not_important_large = NetworkHealSubnets {
name: String::from("European"),
decentralized_subnet: not_important_large,
unhealthy_nodes: vec![],
};

let important =
serde_json::from_str::<ic_management_types::Subnet>(include_str!("../../test_data/subnet-uzr34.json")).expect("failed to read test data");
let important = NetworkHealSubnets {
name: important.metadata.name.clone(),
decentralized_subnet: DecentralizedSubnet::from(important),
unhealthy_nodes: vec![],
};

let unordered = vec![not_important_small.clone(), important.clone(), not_important_large.clone()];
let healing_order = unordered.clone().into_iter().sorted_by(|a, b| a.cmp(b).reverse()).collect_vec();

assert_eq!(vec![important, not_important_large, not_important_small], healing_order);
}

#[test]
fn test_network_heal() {
let nodes_available = new_test_nodes("spare", 10, 2);
let nodes_available_principals = nodes_available.iter().map(|n| n.id).collect_vec();

let important =
serde_json::from_str::<ic_management_types::Subnet>(include_str!("../../test_data/subnet-uzr34.json")).expect("failed to read test data");
let important_decentralized = DecentralizedSubnet::from(important.clone());
let important_unhealthy_principals = vec![
PrincipalId::from_str("e4ysi-xp4fs-5ckcv-7e76q-edydw-ak6le-2acyt-k7udb-lj2vo-fqhhx-vqe").unwrap(),
PrincipalId::from_str("aefqq-d7ldg-ljk5s-cmnxk-qqu7c-tw52l-74g3m-xxl5d-ag4ia-dxubz-wae").unwrap(),
];
let unhealthy_nodes = important_decentralized
.nodes
.clone()
.into_iter()
.filter(|n| important_unhealthy_principals.contains(&n.id))
.collect_vec();
let important = NetworkHealSubnets {
name: important.metadata.name.clone(),
decentralized_subnet: important_decentralized,
unhealthy_nodes: unhealthy_nodes.clone(),
};

let max_replaceable_nodes = None;
let network_heal_response = NetworkHealRequest::new(vec![important.clone()])
.heal_and_optimize(nodes_available.clone(), max_replaceable_nodes)
.unwrap();
let result = network_heal_response.first().unwrap().clone();

assert_eq!(important_unhealthy_principals, result.removed.clone());

assert_eq!(important_unhealthy_principals.len(), result.added.len());

let max_replaceable_nodes = Some(1);
let network_heal_response = NetworkHealRequest::new(vec![important.clone()])
.heal_and_optimize(nodes_available.clone(), max_replaceable_nodes)
.unwrap();
let result = network_heal_response.first().unwrap().clone();

assert_eq!(important_unhealthy_principals.into_iter().take(1).collect_vec(), result.removed.clone());

assert_eq!(1, result.added.len());

result.added.iter().for_each(|n| assert!(nodes_available_principals.contains(n)));
}
}
86 changes: 84 additions & 2 deletions rs/decentralization/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use ic_base_types::PrincipalId;
use ic_management_types::{MinNakamotoCoefficients, NetworkError, NodeFeature};
use itertools::Itertools;
use log::{debug, info};
use log::{debug, info, warn};
use rand::{seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl From<&ic_management_types::Node> for Node {
}
}

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct DecentralizedSubnet {
pub id: PrincipalId,
pub nodes: Vec<Node>,
Expand Down Expand Up @@ -985,3 +985,85 @@ impl Display for SubnetChange {
write!(f, "{}", SubnetChangeResponse::from(self))
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NetworkHealSubnets {
pub name: String,
pub decentralized_subnet: DecentralizedSubnet,
pub unhealthy_nodes: Vec<Node>,
}

impl NetworkHealSubnets {
const IMPORTANT_SUBNETS: &'static [&'static str] = &["NNS", "SNS", "Bitcoin", "Internet Identity", "tECDSA signing"];
}

impl PartialOrd for NetworkHealSubnets {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for NetworkHealSubnets {
fn cmp(&self, other: &Self) -> Ordering {
let self_is_important = NetworkHealSubnets::IMPORTANT_SUBNETS.contains(&self.name.as_str());
let other_is_important = NetworkHealSubnets::IMPORTANT_SUBNETS.contains(&other.name.as_str());

match (self_is_important, other_is_important) {
(true, false) => Ordering::Greater,
(false, true) => Ordering::Less,
_ => self.decentralized_subnet.nodes.len().cmp(&other.decentralized_subnet.nodes.len()),
}
}
}

pub struct NetworkHealRequest {
pub subnets: Vec<NetworkHealSubnets>,
}

impl NetworkHealRequest {
pub fn new(subnets: Vec<NetworkHealSubnets>) -> Self {
Self { subnets }
}

pub fn heal_and_optimize(
&self,
mut available_nodes: Vec<Node>,
max_replaceable_nodes: Option<usize>,
) -> Result<Vec<SubnetChangeResponse>, NetworkError> {
let mut subnets_changed = Vec::new();
let subnets_to_heal = self.subnets.iter().sorted_by(|a, b| a.cmp(b).reverse()).collect_vec();

for subnet in subnets_to_heal {
let mut unhealthy_nodes = subnet.unhealthy_nodes.clone();
let unhealthy_nodes_len = unhealthy_nodes.len();

if let Some(max_replaceable_nodes) = max_replaceable_nodes {
if unhealthy_nodes_len > max_replaceable_nodes {
unhealthy_nodes = subnet.unhealthy_nodes.clone().into_iter().take(max_replaceable_nodes).collect_vec();

warn!(
"Subnet {}: replacing {} of {} unhealthy nodes: {:?}",
subnet.decentralized_subnet.id,
max_replaceable_nodes,
unhealthy_nodes_len,
unhealthy_nodes.iter().map(|node| node.id).collect_vec()
);
}
}
let unhealthy_nodes_len = unhealthy_nodes.len();
let optimize_limit = max_replaceable_nodes.unwrap_or(unhealthy_nodes_len) - unhealthy_nodes_len;

let change = SubnetChangeRequest {
subnet: subnet.decentralized_subnet.clone(),
available_nodes: available_nodes.clone(),
..Default::default()
};
let change = change.optimize(optimize_limit, &unhealthy_nodes)?;

available_nodes.retain(|node| !change.added().contains(node));
subnets_changed.push(SubnetChangeResponse::from(&change));
}

Ok(subnets_changed)
}
}
2 changes: 2 additions & 0 deletions rs/ic-management-backend/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod governance_canister;
pub mod network;
pub mod nodes_ops;
pub mod query_decentralization;
pub mod release;
Expand Down Expand Up @@ -86,6 +87,7 @@ pub async fn run_backend(
.service(self::subnet::create_subnet)
.service(self::subnet::resize)
.service(self::subnet::change_preview)
.service(self::network::heal)
.service(self::nodes_ops::remove)
.service(self::query_decentralization::decentralization_subnet_query)
.service(self::query_decentralization::decentralization_whatif_query)
Expand Down
36 changes: 36 additions & 0 deletions rs/ic-management-backend/src/endpoints/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::*;
use crate::subnets;
use decentralization::network::{DecentralizedSubnet, NetworkHealRequest, NetworkHealSubnets, Node};
use ic_management_types::{requests::HealRequest, NetworkError};
use itertools::Itertools;

#[post("/network/heal")]
async fn heal(request: web::Json<HealRequest>, registry: web::Data<Arc<RwLock<RegistryState>>>) -> Result<HttpResponse, Error> {
let registry = registry.read().await;
let health_client = health::HealthClient::new(registry.network());
let nodes_health = health_client
.nodes()
.await
.map_err(|_| actix_web::error::ErrorInternalServerError("failed to fetch subnet health".to_string()))?;
let subnets: BTreeMap<PrincipalId, ic_management_types::Subnet> = registry.subnets();
let unhealthy_subnets: BTreeMap<PrincipalId, Vec<ic_management_types::Node>> = subnets::unhealthy_with_nodes(&subnets, nodes_health).await;

let subnets_to_heal = unhealthy_subnets
.iter()
.flat_map(|(id, unhealthy_nodes)| {
let unhealthy_nodes = unhealthy_nodes.iter().map(Node::from).collect::<Vec<_>>();
let unhealthy_subnet = subnets.get(id).ok_or(NetworkError::SubnetNotFound(*id))?;

Ok::<NetworkHealSubnets, NetworkError>(NetworkHealSubnets {
name: unhealthy_subnet.metadata.name.clone(),
decentralized_subnet: DecentralizedSubnet::from(unhealthy_subnet),
unhealthy_nodes,
})
})
.collect_vec();

let subnets_change_response =
NetworkHealRequest::new(subnets_to_heal).heal_and_optimize(registry.available_nodes().await?, request.max_replaceable_nodes_per_sub)?;

Ok(HttpResponse::Ok().json(decentralization::HealResponse { subnets_change_response }))
}
Loading

0 comments on commit 099fc6d

Please sign in to comment.