Skip to content

Commit

Permalink
fix: watcher gets receipts from all witnesses
Browse files Browse the repository at this point in the history
  • Loading branch information
edytapawlak committed Mar 25, 2024
1 parent 6963057 commit 425bb92
Showing 1 changed file with 60 additions and 43 deletions.
103 changes: 60 additions & 43 deletions components/watcher/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ impl WatcherData {
} => {
let local_state = self.get_state_for_prefix(&args.i)?;
match (local_state, args.s) {
(Some(state), Some(sn)) if sn <= state.sn => {
}
(Some(state), Some(sn)) if sn <= state.sn => {}
_ => {
// query watcher and return info, that it's not ready
let _ = self.update_local_kel(&qry.query.get_prefix()).await;
Expand All @@ -262,7 +261,7 @@ impl WatcherData {
return Err(ActorError::NoIdentState { prefix: id })
}
Err(e) => {
return Err(ActorError::GeneralError(e.to_string()));
return Err(ActorError::GeneralError(e.to_string()));
}
};

Expand Down Expand Up @@ -315,49 +314,52 @@ impl WatcherData {

/// Forward query to random registered witness and save its response to mailbox.
async fn forward_query(&self, id: &IdentifierPrefix) -> Result<(), ActorError> {
let random_witness = IdentifierPrefix::Basic(self.get_witness_for_prefix(id.clone())?);
let route = QueryRoute::Logs {
reply_route: "".to_string(),
args: LogsQueryArgs {
i: id.clone(),
s: None,
src: Some(random_witness.clone()),
},
};

let qry = QueryEvent::new_query(
route,
SerializationFormats::JSON,
HashFunctionCode::Blake3_256,
)?;
// Create a new signed message
let sigs = SelfSigningPrefix::Ed25519Sha512(self.signer.sign(qry.encode()?)?);
let signed_qry = SignedKelQuery::new_nontrans(qry.clone(), self.prefix.clone(), sigs);
let witnesses = self.get_witnesses_for_prefix(&id)?;
for witness in witnesses {
let witness_id = IdentifierPrefix::Basic(witness);
let route = QueryRoute::Logs {
reply_route: "".to_string(),
args: LogsQueryArgs {
i: id.clone(),
s: None,
src: Some(witness_id.clone()),
},
};

let resp = self
.send_query_to(
random_witness.clone(),
keri_core::oobi::Scheme::Http,
signed_qry,
)
.await?;
let qry = QueryEvent::new_query(
route,
SerializationFormats::JSON,
HashFunctionCode::Blake3_256,
)?;
// Create a new signed message
let sigs = SelfSigningPrefix::Ed25519Sha512(self.signer.sign(qry.encode()?)?);
let signed_qry = SignedKelQuery::new_nontrans(qry.clone(), self.prefix.clone(), sigs);

let resp = self
.send_query_to(
witness_id.clone(),
keri_core::oobi::Scheme::Http,
signed_qry,
)
.await?;

match resp {
PossibleResponse::Ksn(rpy) => {
self.process_reply(rpy)?;
}
PossibleResponse::Kel(msgs) => {
for msg in msgs {
if let Message::Notice(notice) = msg {
self.process_notice(notice.clone())?;
if let Notice::Event(evt) = notice {
self.event_storage.add_mailbox_reply(evt)?;
match resp {
PossibleResponse::Ksn(rpy) => {
self.process_reply(rpy)?;
}
PossibleResponse::Kel(msgs) => {
for msg in msgs {
if let Message::Notice(notice) = msg {
self.process_notice(notice.clone())?;
if let Notice::Event(evt) = notice {
self.event_storage.add_mailbox_reply(evt)?;
}
}
}
}
}
PossibleResponse::Mbx(_mbx) => {
panic!("Unexpected response type MBX");
PossibleResponse::Mbx(_mbx) => {
panic!("Unexpected response type MBX");
}
}
}

Expand Down Expand Up @@ -392,7 +394,7 @@ impl WatcherData {

let query = SignedKelQuery::new_nontrans(qry, self.prefix.clone(), signature);

let wit_id = self.get_witness_for_prefix(prefix.clone())?;
let wit_id = self.get_random_witness_for_prefix(prefix.clone())?;

let resp = self
.send_query_to(IdentifierPrefix::Basic(wit_id.clone()), Scheme::Http, query)
Expand All @@ -409,7 +411,10 @@ impl WatcherData {
}

/// Get witnesses for prefix and choose one randomly
fn get_witness_for_prefix(&self, id: IdentifierPrefix) -> Result<BasicPrefix, ActorError> {
fn get_random_witness_for_prefix(
&self,
id: IdentifierPrefix,
) -> Result<BasicPrefix, ActorError> {
let wit_id = self
.get_state_for_prefix(&id)?
.and_then(|state| {
Expand All @@ -423,6 +428,18 @@ impl WatcherData {
Ok(wit_id)
}

/// Get witnesses for prefix and choose one randomly
fn get_witnesses_for_prefix(
&self,
id: &IdentifierPrefix,
) -> Result<Vec<BasicPrefix>, ActorError> {
let wit_id = self
.get_state_for_prefix(&id)?
.map(|state| state.witness_config.witnesses)
.ok_or(ActorError::NoIdentState { prefix: id.clone() })?;
Ok(wit_id)
}

/// Query roles in oobi manager to check if controller with given ID is allowed to communicate with us.
fn check_role(&self, cid: &IdentifierPrefix) -> Result<bool, DbError> {
Ok(self
Expand Down

0 comments on commit 425bb92

Please # to comment.