diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 158112bb..00316dc3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: check_features: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Run check features run: | cd omnipaxos @@ -39,7 +39,7 @@ jobs: toolchain: ${{ matrix.rust }} override: true - name: Use the cache to share dependencies # keyed by Cargo.lock - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | ~/.cargo/registry @@ -71,7 +71,7 @@ jobs: components: rustfmt, clippy override: true - name: Use the cache to share dependencies # keyed by Cargo.lock - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | ~/.cargo/registry @@ -89,7 +89,7 @@ jobs: clippy_check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Run Clippy run: cargo clippy -- -D warnings -W clippy::all @@ -97,7 +97,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout Repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install dependencies run: npm install js-yaml fs - name: Run Document Structure Check diff --git a/examples/dashboard/src/main.rs b/examples/dashboard/src/main.rs index adfcc9bc..86432b71 100644 --- a/examples/dashboard/src/main.rs +++ b/examples/dashboard/src/main.rs @@ -1,3 +1,5 @@ +#![allow(clippy::uninlined_format_args)] + use crate::{entry::LogEntry, server::OmniPaxosServer, util::*}; use omnipaxos::{ClusterConfig, OmniPaxos, OmniPaxosConfig, ServerConfig}; use omnipaxos_storage::memory_storage::MemoryStorage; diff --git a/examples/kv_store/src/main.rs b/examples/kv_store/src/main.rs index 58078b02..a2084c16 100644 --- a/examples/kv_store/src/main.rs +++ b/examples/kv_store/src/main.rs @@ -1,3 +1,5 @@ +#![allow(clippy::uninlined_format_args)] + use crate::{kv::KeyValue, server::OmniPaxosServer, util::*}; use omnipaxos::{ messages::Message, diff --git a/omnipaxos/Cargo.toml b/omnipaxos/Cargo.toml index cb5a93ef..40ef4f61 100644 --- a/omnipaxos/Cargo.toml +++ b/omnipaxos/Cargo.toml @@ -23,6 +23,7 @@ omnipaxos_macros = { version = "0.1.3", path = "../omnipaxos_macros", optional = lru = { version = "0.11.0", optional = true } num-traits = { version = "0.2.16", optional = true } linked_hash_set = { version = "0.1.4", optional = true } +nohash-hasher = "0.2.0" [dev-dependencies] kompact = { git = "https://github.com/kompics/kompact", rev = "4fd1fdc", features = ["silent_logging"] } diff --git a/omnipaxos/src/lib.rs b/omnipaxos/src/lib.rs index 9df84cec..f98d55a8 100644 --- a/omnipaxos/src/lib.rs +++ b/omnipaxos/src/lib.rs @@ -12,6 +12,7 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![deny(missing_docs)] +#![allow(clippy::uninlined_format_args)] /// Trait and struct related to the leader election in Omni-Paxos. pub mod ballot_leader_election; /// OmniPaxos error definitions diff --git a/omnipaxos/src/sequence_paxos/leader.rs b/omnipaxos/src/sequence_paxos/leader.rs index fd9379fa..d0f0e693 100644 --- a/omnipaxos/src/sequence_paxos/leader.rs +++ b/omnipaxos/src/sequence_paxos/leader.rs @@ -20,8 +20,7 @@ where #[cfg(feature = "logging")] debug!(self.logger, "Newly elected leader: {:?}", n); if self.pid == n.pid { - self.leader_state = - LeaderState::with(n, self.leader_state.max_pid, self.leader_state.quorum); + self.leader_state = LeaderState::with(n, &self.peers, self.leader_state.quorum); // Flush any pending writes // Don't have to handle flushed entries here because we will sync with followers let _ = self.internal_storage.flush_batch().expect(WRITE_ERROR_MSG); diff --git a/omnipaxos/src/sequence_paxos/mod.rs b/omnipaxos/src/sequence_paxos/mod.rs index 3692820b..5c856a9c 100644 --- a/omnipaxos/src/sequence_paxos/mod.rs +++ b/omnipaxos/src/sequence_paxos/mod.rs @@ -55,8 +55,6 @@ where let peers = config.peers; let num_nodes = &peers.len() + 1; let quorum = Quorum::with(config.flexible_quorum, num_nodes); - let max_peer_pid = peers.iter().max().unwrap(); - let max_pid = *std::cmp::max(max_peer_pid, &pid) as usize; let mut outgoing = Vec::with_capacity(config.buffer_size); let (state, leader) = match storage .get_promise() @@ -88,12 +86,12 @@ where pid, ), pid, - peers, + peers: peers.clone(), state, buffered_proposals: vec![], buffered_stopsign: None, outgoing, - leader_state: LeaderState::::with(leader, max_pid, quorum), + leader_state: LeaderState::::with(leader, &peers, quorum), latest_accepted_meta: None, current_seq_num: SequenceNumber::default(), cached_promise_message: None, @@ -137,24 +135,24 @@ where /// Initiates the trim process. /// # Arguments - /// * `trim_idx` - Deletes all entries up to [`trim_idx`], if the [`trim_idx`] is `None` then the minimum index accepted by **ALL** servers will be used as the [`trim_idx`]. + /// * `trim_idx` - Deletes all entries up to [`trim_idx`], if the [`trim_idx`] is `None` then `decided_idx` will be used as the [`trim_idx`]. pub(crate) fn trim(&mut self, trim_idx: Option) -> Result<(), CompactionErr> { match self.state { (Role::Leader, _) => { - let min_all_accepted_idx = self.leader_state.get_min_all_accepted_idx(); + let decided_idx = self.get_decided_idx(); let trimmed_idx = match trim_idx { - Some(idx) if idx <= *min_all_accepted_idx => idx, + Some(idx) if idx <= decided_idx => idx, None => { #[cfg(feature = "logging")] trace!( self.logger, "No trim index provided, using min_las_idx: {:?}", - min_all_accepted_idx + decided_idx ); - *min_all_accepted_idx + decided_idx } _ => { - return Err(CompactionErr::NotAllDecided(*min_all_accepted_idx)); + return Err(CompactionErr::NotAllDecided(decided_idx)); } }; let result = self.internal_storage.try_trim(trimmed_idx); diff --git a/omnipaxos/src/storage/mod.rs b/omnipaxos/src/storage/mod.rs index a79fd567..310982e3 100644 --- a/omnipaxos/src/storage/mod.rs +++ b/omnipaxos/src/storage/mod.rs @@ -95,7 +95,7 @@ where } /// The Result type returned by the storage API. -pub type StorageResult = Result>; +pub type StorageResult = Result>; /// The write operations of the storge implementation. #[derive(Debug)] diff --git a/omnipaxos/src/util.rs b/omnipaxos/src/util.rs index 561c3493..beedefcd 100644 --- a/omnipaxos/src/util.rs +++ b/omnipaxos/src/util.rs @@ -3,9 +3,10 @@ use super::{ messages::sequence_paxos::Promise, storage::{Entry, SnapshotType, StopSign}, }; +use nohash_hasher::IntMap; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use std::{cmp::Ordering, fmt::Debug, marker::PhantomData}; +use std::{cmp::Ordering, marker::PhantomData}; /// Struct used to help another server synchronize their log with the current state of our own log. #[derive(Clone, Debug)] @@ -70,20 +71,22 @@ enum PromiseState { PromisedHigher, } +/// type alias for a map from NodeId to a value of type T +pub type NodeMap = IntMap; + #[derive(Debug, Clone)] pub(crate) struct LeaderState where T: Entry, { pub n_leader: Ballot, - promises_meta: Vec, + promises_meta: NodeMap, // the sequence number of accepts for each follower where AcceptSync has sequence number = 1 - follower_seq_nums: Vec, - pub accepted_indexes: Vec, + follower_seq_nums: NodeMap, + pub accepted_indexes: NodeMap, max_promise_meta: PromiseMetaData, max_promise_sync: Option>, - latest_accept_meta: Vec>, // index in outgoing - pub max_pid: usize, + latest_accept_meta: NodeMap>, // index in outgoing // The number of promises needed in the prepare phase to become synced and // the number of accepteds needed in the accept phase to decide an entry. pub quorum: Quorum, @@ -93,41 +96,60 @@ impl LeaderState where T: Entry, { - pub fn with(n_leader: Ballot, max_pid: usize, quorum: Quorum) -> Self { + pub fn with(n_leader: Ballot, peers: &[NodeId], quorum: Quorum) -> Self { + let mut promises_meta = NodeMap::default(); + let mut follower_seq_nums = NodeMap::default(); + let mut accepted_indexes = NodeMap::default(); + let mut latest_accept_meta = NodeMap::default(); + + // Initialize maps for all peers + for &peer in peers.iter() { + promises_meta.insert(peer, PromiseState::NotPromised); + follower_seq_nums.insert(peer, SequenceNumber::default()); + accepted_indexes.insert(peer, 0); + latest_accept_meta.insert(peer, None); + } + Self { n_leader, - promises_meta: vec![PromiseState::NotPromised; max_pid], - follower_seq_nums: vec![SequenceNumber::default(); max_pid], - accepted_indexes: vec![0; max_pid], + promises_meta, + follower_seq_nums, + accepted_indexes, max_promise_meta: PromiseMetaData::default(), max_promise_sync: None, - latest_accept_meta: vec![None; max_pid], - max_pid, + latest_accept_meta, quorum, } } - fn pid_to_idx(pid: NodeId) -> usize { - (pid - 1) as usize - } - - // Resets `pid`'s accept sequence to indicate they are in the next session of accepts pub fn increment_seq_num_session(&mut self, pid: NodeId) { - let idx = Self::pid_to_idx(pid); - self.follower_seq_nums[idx].session += 1; - self.follower_seq_nums[idx].counter = 0; + if let Some(seq_num) = self.follower_seq_nums.get_mut(&pid) { + seq_num.session += 1; + seq_num.counter = 0; + } } pub fn next_seq_num(&mut self, pid: NodeId) -> SequenceNumber { - let idx = Self::pid_to_idx(pid); - self.follower_seq_nums[idx].counter += 1; - self.follower_seq_nums[idx] + if let Some(seq_num) = self.follower_seq_nums.get_mut(&pid) { + seq_num.counter += 1; + *seq_num + } else { + // Handle case where pid is not in the map + let new_seq = SequenceNumber { + counter: 1, + ..Default::default() + }; + self.follower_seq_nums.insert(pid, new_seq); + new_seq + } } - pub fn get_seq_num(&mut self, pid: NodeId) -> SequenceNumber { - self.follower_seq_nums[Self::pid_to_idx(pid)] + pub fn get_seq_num(&self, pid: NodeId) -> SequenceNumber { + self.follower_seq_nums + .get(&pid) + .copied() + .unwrap_or_default() } - pub fn set_promise(&mut self, prom: Promise, from: NodeId, check_max_prom: bool) -> bool { let promise_meta = PromiseMetaData { n_accepted: prom.n_accepted, @@ -139,22 +161,24 @@ where self.max_promise_meta = promise_meta.clone(); self.max_promise_sync = prom.log_sync; } - self.promises_meta[Self::pid_to_idx(from)] = PromiseState::Promised(promise_meta); + self.promises_meta + .insert(from, PromiseState::Promised(promise_meta)); + let num_promised = self .promises_meta - .iter() + .values() .filter(|p| matches!(p, PromiseState::Promised(_))) .count(); self.quorum.is_prepare_quorum(num_promised) } pub fn reset_promise(&mut self, pid: NodeId) { - self.promises_meta[Self::pid_to_idx(pid)] = PromiseState::NotPromised; + self.promises_meta.insert(pid, PromiseState::NotPromised); } /// Node `pid` seen with ballot greater than my ballot pub fn lost_promise(&mut self, pid: NodeId) { - self.promises_meta[Self::pid_to_idx(pid)] = PromiseState::PromisedHigher; + self.promises_meta.insert(pid, PromiseState::PromisedHigher); } pub fn take_max_promise_sync(&mut self) -> Option> { @@ -167,7 +191,7 @@ where pub fn get_max_decided_idx(&self) -> usize { self.promises_meta - .iter() + .values() .filter_map(|p| match p { PromiseState::Promised(m) => Some(m.decided_idx), _ => None, @@ -177,31 +201,23 @@ where } pub fn get_promise_meta(&self, pid: NodeId) -> &PromiseMetaData { - match &self.promises_meta[Self::pid_to_idx(pid)] { - PromiseState::Promised(metadata) => metadata, + match self.promises_meta.get(&pid) { + Some(PromiseState::Promised(metadata)) => metadata, _ => panic!("No Metadata found for promised follower"), } } - pub fn get_min_all_accepted_idx(&self) -> &usize { - self.accepted_indexes - .iter() - .min() - .expect("Should be all initialised to 0!") - } - pub fn reset_latest_accept_meta(&mut self) { - self.latest_accept_meta = vec![None; self.max_pid]; + for value in self.latest_accept_meta.values_mut() { + *value = None; + } } pub fn get_promised_followers(&self) -> Vec { self.promises_meta .iter() - .enumerate() - .filter_map(|(idx, x)| match x { - PromiseState::Promised(_) if idx != Self::pid_to_idx(self.n_leader.pid) => { - Some((idx + 1) as NodeId) - } + .filter_map(|(pid, x)| match x { + PromiseState::Promised(_) if *pid != self.n_leader.pid => Some(*pid), _ => None, }) .collect() @@ -211,48 +227,41 @@ where pub(crate) fn get_preparable_peers(&self, peers: &[NodeId]) -> Vec { peers .iter() - .filter_map(|pid| { - let idx = Self::pid_to_idx(*pid); - match self.promises_meta.get(idx).unwrap() { - PromiseState::NotPromised => Some(*pid), - _ => None, - } + .filter_map(|pid| match self.promises_meta.get(pid).unwrap() { + PromiseState::NotPromised => Some(*pid), + _ => None, }) .collect() } pub fn set_latest_accept_meta(&mut self, pid: NodeId, idx: Option) { let meta = idx.map(|x| (self.n_leader, x)); - self.latest_accept_meta[Self::pid_to_idx(pid)] = meta; + self.latest_accept_meta.insert(pid, meta); } pub fn set_accepted_idx(&mut self, pid: NodeId, idx: usize) { - self.accepted_indexes[Self::pid_to_idx(pid)] = idx; + self.accepted_indexes.insert(pid, idx); } pub fn get_latest_accept_meta(&self, pid: NodeId) -> Option<(Ballot, usize)> { - self.latest_accept_meta - .get(Self::pid_to_idx(pid)) - .unwrap() - .as_ref() - .copied() + self.latest_accept_meta.get(&pid).and_then(|x| *x) } pub fn get_decided_idx(&self, pid: NodeId) -> Option { - match self.promises_meta.get(Self::pid_to_idx(pid)).unwrap() { - PromiseState::Promised(metadata) => Some(metadata.decided_idx), + match self.promises_meta.get(&pid) { + Some(PromiseState::Promised(metadata)) => Some(metadata.decided_idx), _ => None, } } pub fn get_accepted_idx(&self, pid: NodeId) -> usize { - *self.accepted_indexes.get(Self::pid_to_idx(pid)).unwrap() + self.accepted_indexes.get(&pid).copied().unwrap_or(0) } pub fn is_chosen(&self, idx: usize) -> bool { let num_accepted = self .accepted_indexes - .iter() + .values() .filter(|la| **la >= idx) .count(); self.quorum.is_accept_quorum(num_accepted) @@ -487,7 +496,7 @@ mod tests { let quorum = Quorum::Majority(2); let max_pid = 8; let leader_state = - LeaderState::::with(Ballot::with(1, 1, 1, max_pid), max_pid as usize, quorum); + LeaderState::::with(Ballot::with(1, 1, 1, max_pid), &nodes, quorum); let prep_peers = leader_state.get_preparable_peers(&nodes); assert_eq!(prep_peers, nodes); @@ -495,7 +504,7 @@ mod tests { let quorum = Quorum::Majority(3); let max_pid = 100; let leader_state = - LeaderState::::with(Ballot::with(1, 1, 1, max_pid), max_pid as usize, quorum); + LeaderState::::with(Ballot::with(1, 1, 1, max_pid), &nodes, quorum); let prep_peers = leader_state.get_preparable_peers(&nodes); assert_eq!(prep_peers, nodes); } diff --git a/omnipaxos/src/utils/ui.rs b/omnipaxos/src/utils/ui.rs index 5cdaaa5d..fcf4fc89 100644 --- a/omnipaxos/src/utils/ui.rs +++ b/omnipaxos/src/utils/ui.rs @@ -2,14 +2,14 @@ use crate::{ ballot_leader_election::Ballot, messages::ballot_leader_election::HeartbeatReply, storage::Entry, - util::{LeaderState, NodeId}, + util::{LeaderState, NodeId, NodeMap}, }; /// The states of all the nodes in the cluster. #[derive(Debug, Clone, Default)] pub struct ClusterState { /// The accepted indexes of all the nodes in the cluster. The index of the vector is the node id. - pub accepted_indexes: Vec, + pub accepted_indexes: NodeMap, /// All the received heartbeats from the previous heartbeat round, including the current node. /// Represents nodes that are currently alive from the view of the current node. pub heartbeats: Vec, diff --git a/omnipaxos_macros/src/lib.rs b/omnipaxos_macros/src/lib.rs index 421b10c7..d14ad45d 100644 --- a/omnipaxos_macros/src/lib.rs +++ b/omnipaxos_macros/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::uninlined_format_args)] + use proc_macro::TokenStream; use quote::quote; use syn::{parse_macro_input, DeriveInput, Ident}; diff --git a/omnipaxos_storage/Cargo.toml b/omnipaxos_storage/Cargo.toml index 824a6b78..08df7a6a 100644 --- a/omnipaxos_storage/Cargo.toml +++ b/omnipaxos_storage/Cargo.toml @@ -18,7 +18,7 @@ omnipaxos = { version = "0.2.2", path = "../omnipaxos", features = ["serde"] } serde = { version = "1.0", features = ["derive"], optional= true } bincode = { version = "1.3.3", optional = true } zerocopy = { version = "0.6.1", optional = true } -rocksdb = { version = "0.21.0", optional = true } +rocksdb = { version = "0.23.0", optional = true } [profile.release] lto = true diff --git a/omnipaxos_storage/src/lib.rs b/omnipaxos_storage/src/lib.rs index 25727b97..0c88dd81 100644 --- a/omnipaxos_storage/src/lib.rs +++ b/omnipaxos_storage/src/lib.rs @@ -2,6 +2,7 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![deny(missing_docs)] +#![allow(clippy::uninlined_format_args)] /// an in-memory storage implementation with fast read and writes pub mod memory_storage; diff --git a/omnipaxos_storage/src/persistent_storage.rs b/omnipaxos_storage/src/persistent_storage.rs index a117f46b..d49a8f6e 100644 --- a/omnipaxos_storage/src/persistent_storage.rs +++ b/omnipaxos_storage/src/persistent_storage.rs @@ -2,7 +2,11 @@ use omnipaxos::{ ballot_leader_election::Ballot, storage::{Entry, StopSign, Storage, StorageOp, StorageResult}, }; -use rocksdb::{ColumnFamilyDescriptor, ColumnFamilyRef, Options, WriteBatchWithTransaction, DB}; +pub use rocksdb::{ + statistics::{Histogram, HistogramData}, + Options, +}; +use rocksdb::{ColumnFamilyDescriptor, ColumnFamilyRef, WriteBatchWithTransaction, DB}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use zerocopy::{AsBytes, FromBytes}; diff --git a/omnipaxos_ui/src/lib.rs b/omnipaxos_ui/src/lib.rs index 4e5d576e..d9c760cc 100644 --- a/omnipaxos_ui/src/lib.rs +++ b/omnipaxos_ui/src/lib.rs @@ -1,6 +1,7 @@ //! A library for visualizing [OmniPaxos](https://crates.io/crates/omnipaxos) node in a terminal dashboard. #![deny(missing_docs)] +#![allow(clippy::uninlined_format_args)] use crate::app::{App, Role}; use crossterm::{ event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, @@ -108,16 +109,20 @@ impl OmniPaxosUI { // Current node is the leader self.app.current_role = Role::Leader; // Update the progress of all the followers - let leader_acc_idx = op_states.cluster_state.accepted_indexes[leader_id as usize]; - for (idx, &accepted_idx) in - op_states.cluster_state.accepted_indexes.iter().enumerate() - { - self.app.followers_progress[idx] = if leader_acc_idx == 0 { + let leader_acc_idx = op_states + .cluster_state + .accepted_indexes + .get(&leader_id) + .copied() + .unwrap_or(0); + + for (&idx, &accepted_idx) in op_states.cluster_state.accepted_indexes.iter() { + self.app.followers_progress[idx as usize] = if leader_acc_idx == 0 { 0.0 // To avoid division by zero } else { accepted_idx as f64 / leader_acc_idx as f64 }; - self.app.followers_accepted_idx[idx] = accepted_idx; + self.app.followers_accepted_idx[idx as usize] = accepted_idx; } } else { // Current node is a follower