Skip to content

Commit

Permalink
all: Rename EntityChange to AssignmentChange and clean up its API
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Feb 18, 2025
1 parent 9c69088 commit a119ed6
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 73 deletions.
4 changes: 2 additions & 2 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph::prelude::EntityChange;
use graph::prelude::AssignmentChange;
use graph::prelude::NodeId;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub fn reassign_deployment(
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes: Vec<EntityChange> = match catalog_conn
let changes: Vec<AssignmentChange> = match catalog_conn
.assigned_node(&deployment.site)
.map_err(GraphmanError::from)?
{
Expand Down
17 changes: 4 additions & 13 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,8 @@ where
.subscribe()
.map_err(|()| anyhow!("Entity change stream failed"))
.map(|event| {
let assignments = event
.changes
.iter()
.map(|change| match change {
EntityChange::Assignment {
deployment,
operation,
} => (deployment.clone(), operation.clone()),
})
.collect::<Vec<_>>();
stream::iter_ok(assignments)
let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect();
stream::iter_ok(changes)
})
.flatten()
.and_then(
Expand All @@ -178,7 +169,7 @@ where
);

match operation {
EntityChangeOperation::Set => {
AssignmentOperation::Set => {
store
.assignment_status(&deployment)
.map_err(|e| {
Expand Down Expand Up @@ -215,7 +206,7 @@ where
}
})
}
EntityChangeOperation::Removed => {
AssignmentOperation::Removed => {
// Send remove event without checking node ID.
// If node ID does not match, then this is a no-op when handled in
// assignment provider.
Expand Down
44 changes: 27 additions & 17 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,32 +537,42 @@ impl EntityQuery {
}
}

/// Operation types that lead to entity changes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
/// Operation types that lead to changes in assignments
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
pub enum EntityChangeOperation {
/// An entity was added or updated
pub enum AssignmentOperation {
/// An assignment was added or updated
Set,
/// An existing entity was removed.
/// An assignment was removed.
Removed,
}

/// Entity change events emitted by [Store](trait.Store.html) implementations.
/// Assignment change events emitted by [Store](trait.Store.html) implementations.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum EntityChange {
Assignment {
deployment: DeploymentLocator,
operation: EntityChangeOperation,
},
pub struct AssignmentChange {
deployment: DeploymentLocator,
operation: AssignmentOperation,
}

impl EntityChange {
pub fn for_assignment(deployment: DeploymentLocator, operation: EntityChangeOperation) -> Self {
Self::Assignment {
impl AssignmentChange {
fn new(deployment: DeploymentLocator, operation: AssignmentOperation) -> Self {
Self {
deployment,
operation,
}
}

pub fn set(deployment: DeploymentLocator) -> Self {
Self::new(deployment, AssignmentOperation::Set)
}

pub fn removed(deployment: DeploymentLocator) -> Self {
Self::new(deployment, AssignmentOperation::Removed)
}

pub fn into_parts(self) -> (DeploymentLocator, AssignmentOperation) {
(self.deployment, self.operation)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -578,16 +588,16 @@ pub struct StoreEvent {
// The tag is only there to make it easier to track StoreEvents in the
// logs as they flow through the system
pub tag: usize,
pub changes: HashSet<EntityChange>,
pub changes: HashSet<AssignmentChange>,
}

impl StoreEvent {
pub fn new(changes: Vec<EntityChange>) -> StoreEvent {
pub fn new(changes: Vec<AssignmentChange>) -> StoreEvent {
let changes = changes.into_iter().collect();
StoreEvent::from_set(changes)
}

fn from_set(changes: HashSet<EntityChange>) -> StoreEvent {
fn from_set(changes: HashSet<AssignmentChange>) -> StoreEvent {
static NEXT_TAG: AtomicUsize = AtomicUsize::new(0);

let tag = NEXT_TAG.fetch_add(1, Ordering::Relaxed);
Expand Down
4 changes: 2 additions & 2 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ pub mod prelude {
};
pub use crate::components::server::subscription::SubscriptionServer;
pub use crate::components::store::{
write::EntityModification, AttributeNames, BlockNumber, CachedEthereumCall, ChainStore,
Child, ChildMultiplicity, EntityCache, EntityChange, EntityChangeOperation,
write::EntityModification, AssignmentChange, AssignmentOperation, AttributeNames,
BlockNumber, CachedEthereumCall, ChainStore, Child, ChildMultiplicity, EntityCache,
EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder,
EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow,
EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore,
Expand Down
46 changes: 21 additions & 25 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use graph::{
prelude::{
anyhow,
chrono::{DateTime, Utc},
serde_json, DeploymentHash, EntityChange, EntityChangeOperation, NodeId, StoreError,
SubgraphName, SubgraphVersionSwitchingMode,
serde_json, AssignmentChange, DeploymentHash, NodeId, StoreError, SubgraphName,
SubgraphVersionSwitchingMode,
},
};
use graph::{
Expand Down Expand Up @@ -790,7 +790,7 @@ impl<'a> Connection<'a> {

/// Delete all assignments for deployments that are neither the current nor the
/// pending version of a subgraph and return the deployment id's
fn remove_unused_assignments(&mut self) -> Result<Vec<EntityChange>, StoreError> {
fn remove_unused_assignments(&mut self) -> Result<Vec<AssignmentChange>, StoreError> {
use deployment_schemas as ds;
use subgraph as s;
use subgraph_deployment_assignment as a;
Expand Down Expand Up @@ -827,12 +827,7 @@ impl<'a> Connection<'a> {
.into_iter()
.map(|(id, hash)| {
DeploymentHash::new(hash)
.map(|hash| {
EntityChange::for_assignment(
DeploymentLocator::new(id.into(), hash),
EntityChangeOperation::Removed,
)
})
.map(|hash| AssignmentChange::removed(DeploymentLocator::new(id.into(), hash)))
.map_err(|id| {
StoreError::ConstraintViolation(format!(
"invalid id `{}` for deployment assignment",
Expand All @@ -851,7 +846,7 @@ impl<'a> Connection<'a> {
pub fn promote_deployment(
&mut self,
id: &DeploymentHash,
) -> Result<Vec<EntityChange>, StoreError> {
) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph as s;
use subgraph_version as v;

Expand Down Expand Up @@ -926,7 +921,7 @@ impl<'a> Connection<'a> {
node_id: NodeId,
mode: SubgraphVersionSwitchingMode,
exists_and_synced: F,
) -> Result<Vec<EntityChange>, StoreError>
) -> Result<Vec<AssignmentChange>, StoreError>
where
F: Fn(&DeploymentHash) -> Result<bool, StoreError>,
{
Expand Down Expand Up @@ -1034,13 +1029,16 @@ impl<'a> Connection<'a> {
// Clean up any assignments we might have displaced
let mut changes = self.remove_unused_assignments()?;
if new_assignment {
let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
let change = AssignmentChange::set(site.into());
changes.push(change);
}
Ok(changes)
}

pub fn remove_subgraph(&mut self, name: SubgraphName) -> Result<Vec<EntityChange>, StoreError> {
pub fn remove_subgraph(
&mut self,
name: SubgraphName,
) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph as s;
use subgraph_version as v;

Expand All @@ -1062,7 +1060,7 @@ impl<'a> Connection<'a> {
}
}

pub fn pause_subgraph(&mut self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
pub fn pause_subgraph(&mut self, site: &Site) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_mut();
Expand All @@ -1073,8 +1071,7 @@ impl<'a> Connection<'a> {
match updates {
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1 => {
let change =
EntityChange::for_assignment(site.into(), EntityChangeOperation::Removed);
let change = AssignmentChange::removed(site.into());
Ok(vec![change])
}
_ => {
Expand All @@ -1085,7 +1082,7 @@ impl<'a> Connection<'a> {
}
}

pub fn resume_subgraph(&mut self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
pub fn resume_subgraph(&mut self, site: &Site) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_mut();
Expand All @@ -1096,7 +1093,7 @@ impl<'a> Connection<'a> {
match updates {
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1 => {
let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
let change = AssignmentChange::set(site.into());
Ok(vec![change])
}
_ => {
Expand All @@ -1111,7 +1108,7 @@ impl<'a> Connection<'a> {
&mut self,
site: &Site,
node: &NodeId,
) -> Result<Vec<EntityChange>, StoreError> {
) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_mut();
Expand All @@ -1121,7 +1118,7 @@ impl<'a> Connection<'a> {
match updates {
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1 => {
let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
let change = AssignmentChange::set(site.into());
Ok(vec![change])
}
_ => {
Expand Down Expand Up @@ -1248,19 +1245,19 @@ impl<'a> Connection<'a> {
&mut self,
site: &Site,
node: &NodeId,
) -> Result<Vec<EntityChange>, StoreError> {
) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_mut();
insert_into(a::table)
.values((a::id.eq(site.id), a::node_id.eq(node.as_str())))
.execute(conn)?;

let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
let change = AssignmentChange::set(site.into());
Ok(vec![change])
}

pub fn unassign_subgraph(&mut self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
pub fn unassign_subgraph(&mut self, site: &Site) -> Result<Vec<AssignmentChange>, StoreError> {
use subgraph_deployment_assignment as a;

let conn = self.conn.as_mut();
Expand All @@ -1271,8 +1268,7 @@ impl<'a> Connection<'a> {
match delete_count {
0 => Ok(vec![]),
1 => {
let change =
EntityChange::for_assignment(site.into(), EntityChangeOperation::Removed);
let change = AssignmentChange::removed(site.into());
Ok(vec![change])
}
_ => {
Expand Down
21 changes: 7 additions & 14 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use graph::{
schema::{DeploymentCreate, SubgraphError},
DeploymentFeatures,
},
prelude::AssignmentChange,
prelude::BlockPtr,
prelude::EntityChange,
prelude::EntityChangeOperation,
prelude::QueryStoreManager,
prelude::StoreEvent,
prelude::SubgraphManifest,
Expand Down Expand Up @@ -59,18 +58,12 @@ const SUBGRAPH_FEATURES_GQL: &str = "
}
";

fn assigned(deployment: &DeploymentLocator) -> EntityChange {
EntityChange::Assignment {
deployment: deployment.clone(),
operation: EntityChangeOperation::Set,
}
fn assigned(deployment: &DeploymentLocator) -> AssignmentChange {
AssignmentChange::set(deployment.clone())
}

fn unassigned(deployment: &DeploymentLocator) -> EntityChange {
EntityChange::Assignment {
deployment: deployment.clone(),
operation: EntityChangeOperation::Removed,
}
fn unassigned(deployment: &DeploymentLocator) -> AssignmentChange {
AssignmentChange::removed(deployment.clone())
}

fn get_version_info(store: &Store, subgraph_name: &str) -> VersionInfo {
Expand Down Expand Up @@ -163,7 +156,7 @@ fn create_subgraph() {
store: &SubgraphStore,
id: &str,
mode: SubgraphVersionSwitchingMode,
) -> (DeploymentLocator, HashSet<EntityChange>) {
) -> (DeploymentLocator, HashSet<AssignmentChange>) {
let name = SubgraphName::new(SUBGRAPH_NAME.to_string()).unwrap();
let id = DeploymentHash::new(id.to_string()).unwrap();
let schema = InputSchema::parse_latest(SUBGRAPH_GQL, id.clone()).unwrap();
Expand Down Expand Up @@ -203,7 +196,7 @@ fn create_subgraph() {
(deployment, events)
}

fn deploy_event(deployment: &DeploymentLocator) -> HashSet<EntityChange> {
fn deploy_event(deployment: &DeploymentLocator) -> HashSet<AssignmentChange> {
let mut changes = HashSet::new();
changes.insert(assigned(deployment));
changes
Expand Down

0 comments on commit a119ed6

Please # to comment.