Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

use service provider proto string impls #704

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 19 additions & 14 deletions mobile_config/src/client/carrier_service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,60 @@ use file_store::traits::MsgVerify;
use helium_crypto::{Keypair, PublicKey, Sign};
use helium_proto::{
services::{mobile_config, Channel},
Message,
Message, ServiceProvider,
};
use retainer::Cache;
use std::{sync::Arc, time::Duration};

use std::{str::FromStr, sync::Arc, time::Duration};
#[async_trait]
pub trait CarrierServiceVerifier {
type Error;
async fn key_to_rewardable_entity<'a>(&self, entity_id: &'a str)
-> Result<String, Self::Error>;
async fn payer_key_to_service_provider<'a>(
&self,
payer: &str,
) -> Result<ServiceProvider, Self::Error>;
}
#[derive(Clone)]
pub struct CarrierServiceClient {
client: mobile_config::CarrierServiceClient<Channel>,
signing_key: Arc<Keypair>,
config_pubkey: PublicKey,
cache: Arc<Cache<String, String>>,
cache: Arc<Cache<String, ServiceProvider>>,
cache_ttl: Duration,
}

#[async_trait]
impl CarrierServiceVerifier for CarrierServiceClient {
type Error = ClientError;

async fn key_to_rewardable_entity<'a>(&self, pubkey: &'a str) -> Result<String, ClientError> {
if let Some(carrier_found) = self.cache.get(&pubkey.to_string()).await {
return Ok(carrier_found.value().clone());
async fn payer_key_to_service_provider<'a>(
&self,
payer: &str,
) -> Result<ServiceProvider, ClientError> {
if let Some(carrier_found) = self.cache.get(&payer.to_string()).await {
return Ok(*carrier_found.value());
}

let mut request = mobile_config::CarrierKeyToEntityReqV1 {
pubkey: pubkey.to_string(),
pubkey: payer.to_string(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
request.signature = self.signing_key.sign(&request.encode_to_vec())?;
tracing::debug!(?pubkey, "getting entity key for carrier on-chain");
tracing::debug!(?payer, "getting service provider for payer key");
let response = match call_with_retry!(self.client.clone().key_to_entity(request.clone())) {
Ok(verify_res) => {
let response = verify_res.into_inner();
response.verify(&self.config_pubkey)?;
response.entity_key
ServiceProvider::from_str(&response.entity_key)
.map_err(|_| ClientError::UnknownServiceProvider(payer.to_string()))?
}
Err(status) if status.code() == tonic::Code::NotFound => {
Err(ClientError::UnknownServiceProvider)?
Err(ClientError::UnknownServiceProvider(payer.to_string()))?
}
Err(status) => Err(status)?,
};
self.cache
.insert(pubkey.to_string(), response.clone(), self.cache_ttl)
.insert(payer.to_string(), response, self.cache_ttl)
.await;
Ok(response)
}
Expand Down
4 changes: 2 additions & 2 deletions mobile_config/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub enum ClientError {
VerificationError(#[from] file_store::Error),
#[error("error parsing gateway location {0}")]
LocationParseError(#[from] std::num::ParseIntError),
#[error("unknown service provider name")]
UnknownServiceProvider,
#[error("unknown service provider {0}")]
UnknownServiceProvider(String),
}

macro_rules! call_with_retry {
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::{
TryFutureExt,
};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::ServiceProvider;
use helium_proto::ServiceProvider;
use rust_decimal::Decimal;
use sqlx::{PgPool, Postgres, Row, Transaction};
use std::{collections::HashMap, ops::Range, time::Instant};
Expand Down
31 changes: 13 additions & 18 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ use crate::{
speedtests_average::{SpeedtestAverage, SpeedtestAverages},
subscriber_location::SubscriberValidatedLocations,
};
use anyhow::bail;
use chrono::{DateTime, Duration, Utc};
use file_store::traits::TimestampEncode;
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
poc_mobile as proto,
poc_mobile::{
mobile_reward_share::Reward as ProtoReward, ServiceProvider, UnallocatedReward,
UnallocatedRewardType,
use helium_proto::{
services::{
poc_mobile as proto,
poc_mobile::{
mobile_reward_share::Reward as ProtoReward, UnallocatedReward, UnallocatedRewardType,
},
},
ServiceProvider,
};

use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError};
use rust_decimal::prelude::*;
use rust_decimal_macros::dec;
Expand Down Expand Up @@ -330,16 +332,9 @@ impl ServiceProviderShares {
payer: &str,
client: &impl CarrierServiceVerifier<Error = ClientError>,
) -> anyhow::Result<ServiceProvider> {
tracing::info!(payer, "getting entity key for service provider");
let entity_key = client.key_to_rewardable_entity(payer).await?;
Self::entity_key_to_service_provider(&entity_key)
}

fn entity_key_to_service_provider(key: &str) -> anyhow::Result<ServiceProvider> {
match key {
"Helium Mobile" => Ok(ServiceProvider::HeliumMobile),
_ => bail!("unknown service provider name"),
}
tracing::info!(payer, "getting service provider for payer");
let sp = client.payer_key_to_service_provider(payer).await?;
Ok(sp)
}
}

Expand Down Expand Up @@ -606,8 +601,8 @@ mod test {
use chrono::{Duration, Utc};
use file_store::speedtest::CellSpeedtest;
use futures::stream::{self, BoxStream};
use helium_proto::services::{
poc_mobile::mobile_reward_share::Reward as MobileReward, poc_mobile::ServiceProvider,
use helium_proto::{
services::poc_mobile::mobile_reward_share::Reward as MobileReward, ServiceProvider,
};
use prost::Message;
use std::collections::HashMap;
Expand Down
25 changes: 17 additions & 8 deletions mobile_verifier/tests/rewarder_sp_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::string::ToString;

use async_trait::async_trait;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use helium_proto::services::poc_mobile::{
ServiceProviderReward, UnallocatedReward, UnallocatedRewardType,
use helium_proto::{
services::poc_mobile::{ServiceProviderReward, UnallocatedReward, UnallocatedRewardType},
ServiceProvider,
};
use rust_decimal::prelude::*;
use rust_decimal_macros::dec;
Expand Down Expand Up @@ -35,11 +36,14 @@ impl MockCarrierServiceClient {
impl CarrierServiceVerifier for MockCarrierServiceClient {
type Error = ClientError;

async fn key_to_rewardable_entity<'a>(&self, pubkey: &'a str) -> Result<String, ClientError> {
async fn payer_key_to_service_provider<'a>(
&self,
pubkey: &str,
) -> Result<ServiceProvider, ClientError> {
match self.valid_sps.get(pubkey) {
Some(v) => Ok(v.clone()),

None => Err(ClientError::UnknownServiceProvider),
Some(v) => Ok(ServiceProvider::from_str(v)
.map_err(|_| ClientError::UnknownServiceProvider(pubkey.to_string()))?),
None => Err(ClientError::UnknownServiceProvider(pubkey.to_string())),
}
}
}
Expand Down Expand Up @@ -70,7 +74,12 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> {
receive_expected_rewards(&mut mobile_rewards)
);
if let Ok((sp_reward, unallocated_reward)) = rewards {
// assert_eq!(SP_1.to_string(), ServiceProvider::from_i32(sp_reward.service_provider_id).unwrap().as_str_name());
assert_eq!(
SP_1.to_string(),
ServiceProvider::from_i32(sp_reward.service_provider_id)
.unwrap()
.to_string()
);
assert_eq!(6000, sp_reward.amount);

assert_eq!(
Expand Down Expand Up @@ -123,7 +132,7 @@ async fn test_service_provider_rewards_invalid_sp(pool: PgPool) -> anyhow::Resul
.await;
assert_eq!(
resp.unwrap_err().to_string(),
"unknown service provider name".to_string()
"unknown service provider ".to_string() + PAYER_2
);

// confirm we get no msgs as rewards halted
Expand Down
14 changes: 3 additions & 11 deletions reward_index/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use futures::{stream, StreamExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use helium_proto::{
services::poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare},
services::poc_mobile::{
mobile_reward_share::Reward as MobileReward, MobileRewardShare, ServiceProvider,
},
Message,
services::poc_mobile::{mobile_reward_share::Reward as MobileReward, MobileRewardShare},
Message, ServiceProvider,
};
use poc_metrics::record_duration;
use sqlx::{Pool, Postgres, Transaction};
Expand Down Expand Up @@ -163,7 +161,7 @@ impl Indexer {
if let Some(sp) = ServiceProvider::from_i32(r.service_provider_id) {
Ok((
RewardKey {
key: service_provider_to_entity_key(sp)?,
key: sp.to_string(),
reward_type: RewardType::MobileServiceProvider,
},
r.amount,
Expand Down Expand Up @@ -212,9 +210,3 @@ impl Indexer {
}
}
}

fn service_provider_to_entity_key(sp: ServiceProvider) -> anyhow::Result<String> {
match sp {
ServiceProvider::HeliumMobile => Ok("Helium Mobile".to_string()),
}
}