Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

sdk: allow types to abstract over encoding #718

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ tonic-build = "0.11"
opentelemetry = "0.22"
prost = "0.12"
prost-types = "0.12"
serde_json = "1.0"
30 changes: 14 additions & 16 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use temporal_sdk_core_protos::{
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{ActivityCancellationType, ScheduleLocalActivity},
workflow_completion::WorkflowActivationCompletion,
ActivityTaskCompletion, AsJsonPayloadExt,
ActivityTaskCompletion, ToPayload,
},
temporal::api::{
common::v1::RetryPolicy,
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
|ctx: WfContext| async move {
let la = ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
});
ctx.timer(Duration::from_secs(1)).await;
Expand All @@ -117,9 +117,7 @@ pub async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> {
.map(|i| {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: format!("Hi {i}")
.as_json_payload()
.expect("serializes fine"),
input: format!("Hi {i}").to_payload().expect("serializes fine"),
..Default::default()
})
})
Expand Down Expand Up @@ -198,7 +196,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -254,7 +252,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(50))),
backoff_coefficient: 1.2,
Expand Down Expand Up @@ -335,7 +333,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(65))),
// This will make the second backoff 65 seconds, plenty to use timer
Expand Down Expand Up @@ -389,7 +387,7 @@ async fn local_act_null_result() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "nullres".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -432,7 +430,7 @@ async fn local_act_command_immediately_follows_la_marker() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "nullres".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -736,7 +734,7 @@ async fn test_schedule_to_start_timeout() {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
// Impossibly small timeout so we timeout in the queue
schedule_to_start_timeout: prost_dur!(from_nanos(1)),
..Default::default()
Expand Down Expand Up @@ -824,7 +822,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(50))),
backoff_coefficient: 1.2,
Expand Down Expand Up @@ -897,7 +895,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(20))),
backoff_coefficient: 1.0,
Expand Down Expand Up @@ -971,7 +969,7 @@ async fn wft_failure_cancels_running_las() {
|ctx: WfContext| async move {
let la_handle = ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
});
tokio::join!(
Expand Down Expand Up @@ -1038,7 +1036,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -1092,7 +1090,7 @@ async fn local_act_records_nonfirst_attempts_ok() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(10))),
backoff_coefficient: 1.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ mod tests {
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
AsJsonPayloadExt,
ToPayload,
},
temporal::api::{
common::v1::RetryPolicy, enums::v1::WorkflowTaskFailedCause, failure::v1::Failure,
Expand All @@ -906,7 +906,7 @@ mod tests {
async fn la_wf(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
retry_policy: RetryPolicy {
maximum_attempts: 1,
..Default::default()
Expand Down Expand Up @@ -1003,13 +1003,13 @@ mod tests {
async fn two_la_wf(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
Expand All @@ -1020,12 +1020,12 @@ mod tests {
tokio::join!(
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
}),
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
);
Expand Down Expand Up @@ -1131,14 +1131,14 @@ mod tests {
async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
ctx.timer(Duration::from_secs(5)).await;
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
Expand Down Expand Up @@ -1426,7 +1426,7 @@ mod tests {
worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move {
let la = ctx.local_activity(LocalActivityOptions {
cancel_type,
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
..Default::default()
});
Expand Down
10 changes: 5 additions & 5 deletions core/src/worker/workflow/machines/patch_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
};
use temporal_sdk_core_protos::{
constants::PATCH_MARKER_NAME,
coresdk::{common::build_has_change_marker_details, AsJsonPayloadExt},
coresdk::{common::build_has_change_marker_details, ToPayload},
temporal::api::{
command::v1::{
Command, RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes,
Expand Down Expand Up @@ -135,7 +135,7 @@ pub(super) fn has_change<'a>(
let mut all_ids = BTreeSet::from_iter(existing_patch_ids);
all_ids.insert(machine.shared_state.patch_id.as_str());
let serialized = all_ids
.as_json_payload()
.to_payload()
.context("Could not serialize search attribute value for patch machine")
.map_err(|e| WFMachinesError::Fatal(e.to_string()))?;

Expand Down Expand Up @@ -296,7 +296,7 @@ mod tests {
coresdk::{
common::decode_change_marker_details,
workflow_activation::{workflow_activation_job, NotifyHasPatch, WorkflowActivationJob},
AsJsonPayloadExt, FromJsonPayloadExt,
FromPayload, ToPayload,
},
temporal::api::{
command::v1::{
Expand Down Expand Up @@ -606,7 +606,7 @@ mod tests {
{ search_attributes: Some(attrs) }
)
if attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap()
== &[MY_PATCH_ID].as_json_payload().unwrap()
== &[MY_PATCH_ID].to_payload().unwrap()
);
}
// The only time the "old" timer should fire is in v2, replaying, without a marker.
Expand Down Expand Up @@ -790,7 +790,7 @@ mod tests {
);
let expected_patches: HashSet<String, _> =
(1..i).map(|i| format!("patch-{i}")).collect();
let deserialized = HashSet::<String, RandomState>::from_json_payload(
let deserialized = HashSet::<String, RandomState>::from_payload(
attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap(),
)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::SetPatchMarker,
workflow_completion::WorkflowActivationCompletion,
AsJsonPayloadExt,
ToPayload,
},
temporal::api::{
command::v1::command::Attributes, common::v1::Payload,
Expand Down Expand Up @@ -370,7 +370,7 @@ mod tests {
let mut ver_upsert = HashMap::new();
ver_upsert.insert(
VERSION_SEARCH_ATTR_KEY.to_string(),
"hi".as_json_payload().unwrap(),
"hi".to_payload().unwrap(),
);
let act = core.poll_workflow_activation().await.unwrap();
let mut cmds = if with_patched_cmd {
Expand Down
11 changes: 7 additions & 4 deletions sdk-core-protos/src/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
external_data::LocalActivityMarkerData,
workflow_commands::ScheduleActivity,
AsJsonPayloadExt, IntoPayloadsExt,
IntoPayloadsExt, Json, ToPayload,
},
temporal::api::{
common::v1::{
Expand All @@ -17,8 +17,7 @@ use crate::{
failure::v1::{failure, CanceledFailureInfo, Failure},
history::v1::{history_event::Attributes, *},
taskqueue::v1::TaskQueue,
update,
update::v1::outcome,
update::{self, v1::outcome},
},
HistoryInfo,
};
Expand Down Expand Up @@ -429,7 +428,7 @@ impl TestHistoryBuilder {
let mut indexed_fields = HashMap::new();
indexed_fields.insert(
"TemporalChangeVersion".to_string(),
attribs.as_json_payload().unwrap(),
attribs.to_payload().unwrap(),
);
let attrs = UpsertWorkflowSearchAttributesEventAttributes {
workflow_task_completed_event_id: self.previous_task_completed_id,
Expand Down Expand Up @@ -605,6 +604,10 @@ impl TestHistoryBuilder {
}
}

impl ToPayload for &[String] {
type Encoder = Json;
}

fn default_attribs(et: EventType) -> Result<Attributes> {
Ok(match et {
EventType::WorkflowExecutionStarted => default_wes_attribs().into(),
Expand Down
Loading
Loading