Skip to content

Commit

Permalink
Merge pull request #424 from primitivefinance/waylon/tests
Browse files Browse the repository at this point in the history
Waylon/tests
  • Loading branch information
Autoparallel authored Aug 16, 2023
2 parents d44783f + 236ddca commit 0c4aecb
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 264 deletions.
71 changes: 60 additions & 11 deletions arbiter-core/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Debug for Environment {
/// [`RevmMiddleware`]. Please bring up if you catch errors here by sending a
/// message in the [Telegram group](https://t.me/arbiter_rs) or on
/// [GitHub](https://github.com/primitivefinance/arbiter/).
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum EnvironmentError {
/// [`EnvironmentError::Execution`] is thrown when the [`EVM`] itself
/// throws an error in execution. To be clear, this is not a contract
Expand Down Expand Up @@ -272,7 +272,7 @@ impl Environment {

// Set up the state and tx counter
self.state
.store(State::Running, std::sync::atomic::Ordering::Relaxed);
.store(State::Running, std::sync::atomic::Ordering::SeqCst);
let state = Arc::clone(&self.state);
let pausevar = Arc::clone(&self.pausevar);
let mut counter: usize = 0;
Expand All @@ -285,7 +285,7 @@ impl Environment {
// Loop over the reception of calls/transactions sent through the socket
loop {
// The outermost check is to find what the `Environment`'s state is in
match state.load(std::sync::atomic::Ordering::Relaxed) {
match state.load(std::sync::atomic::Ordering::SeqCst) {
// Leave the loop upon seeing `State::Stopped`
State::Stopped => break,

Expand All @@ -295,7 +295,31 @@ impl Environment {
let mut guard = lock.lock().map_err(|e| EnvironmentError::Pause {
cause: format!("{:?}", e),
})?;
while state.load(std::sync::atomic::Ordering::Relaxed) == State::Paused {

// this logic here ensures we catch any edge case last transactions and send
// the appropriate error so that we dont hang in
// limbo forever
while let Ok((_, _, sender)) = tx_receiver.try_recv() {
let error_outcome =
TransactionOutcome::Error(EnvironmentError::Pause {
cause: "Environment is paused".into(),
});
let revm_result = RevmResult {
outcome: error_outcome,
block_number: convert_uint_to_u64(evm.env.block.number).map_err(
|e| EnvironmentError::Conversion {
cause: format!("{:?}", e),
},
)?,
};
sender.send(revm_result).map_err(|e| {
EnvironmentError::Communication {
cause: format!("{:?}", e),
}
})?;
}

while state.load(std::sync::atomic::Ordering::SeqCst) == State::Paused {
guard = cvar.wait(guard).map_err(|e| EnvironmentError::Pause {
cause: format!("{:?}", e),
})?;
Expand Down Expand Up @@ -333,7 +357,7 @@ impl Environment {
Err(e) => {
state.store(
State::Paused,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::SeqCst,
);
error!("Pausing the environment labeled {} due to an execution error: {:#?}", label, e);
return Err(EnvironmentError::Execution { cause: e });
Expand All @@ -346,7 +370,7 @@ impl Environment {
})?;
event_broadcaster.broadcast(execution_result.logs())?;
let revm_result = RevmResult {
result: execution_result,
outcome: TransactionOutcome::Success(execution_result),
block_number: convert_uint_to_u64(evm.env.block.number)
.map_err(|e| EnvironmentError::Conversion {
cause: format!("{:?}", e),
Expand All @@ -368,14 +392,14 @@ impl Environment {
Err(e) => {
state.store(
State::Paused,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::SeqCst,
);
error!("Pausing the environment labeled {} due to an execution error: {:#?}", label, e);
return Err(EnvironmentError::Execution { cause: e });
}
};
let result_and_block = RevmResult {
result,
outcome: TransactionOutcome::Success(result),
block_number: convert_uint_to_u64(evm.env.block.number)
.map_err(|e| EnvironmentError::Conversion {
cause: format!("{:?}", e),
Expand Down Expand Up @@ -447,13 +471,38 @@ pub(crate) struct Socket {
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
}

/// Represents the possible outcomes of an EVM transaction.
///
/// This enum is used to encapsulate both successful transaction results and
/// potential errors.
/// - `Success`: Indicates that the transaction was executed successfully and
/// contains the result of the execution. The wrapped `ExecutionResult`
/// provides detailed information about the transaction's execution, such as
/// returned values or changes made to the state.
/// - `Error`: Indicates that the transaction failed due to some error
/// condition. The wrapped `EnvironmentError` provides specifics about the
/// error, allowing callers to take appropriate action or relay more
/// informative error messages.
#[derive(Debug, Clone)]
pub(crate) enum TransactionOutcome {
/// Represents a successfully executed transaction.
///
/// Contains the result of the transaction's execution.
Success(ExecutionResult),

/// Represents a failed transaction due to some error.
///
/// Contains information about the error that caused the transaction
/// failure.
Error(EnvironmentError),
}
/// Represents the result of an EVM transaction.
///
/// Contains the outcome of a transaction (e.g., success, revert, halt) and the
/// block number at which the transaction was executed.
#[derive(Debug, Clone)]
pub(crate) struct RevmResult {
pub(crate) result: ExecutionResult,
pub(crate) outcome: TransactionOutcome,
pub(crate) block_number: U64,
}

Expand Down Expand Up @@ -516,15 +565,15 @@ pub(crate) mod tests {
fn new() {
let environment = Environment::new(TEST_ENV_LABEL.to_string(), 1.0, 1);
assert_eq!(environment.label, TEST_ENV_LABEL);
let state = environment.state.load(std::sync::atomic::Ordering::Relaxed);
let state = environment.state.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(state, State::Initialization);
}

#[test]
fn run() {
let mut environment = Environment::new(TEST_ENV_LABEL.to_string(), 1.0, 1);
environment.run();
let state = environment.state.load(std::sync::atomic::Ordering::Relaxed);
let state = environment.state.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(state, State::Running);
}

Expand Down
166 changes: 87 additions & 79 deletions arbiter-core/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pub enum ManagerError {
#[error("joining on the environment thread resulted in a panic")]
ThreadPanic,
}
impl Default for Manager {
fn default() -> Self {
Self::new()
}
}

impl Manager {
/// Creates a new [`Manager`] with an empty set of environments.
Expand Down Expand Up @@ -188,30 +193,31 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::Relaxed)
{
State::Initialization => {
environment.run();
info!("Started environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => {
environment
.state
.store(State::Running, std::sync::atomic::Ordering::Relaxed);
let (lock, pausevar) = &*environment.pausevar;
let _guard = lock.lock().unwrap();
pausevar.notify_all();
info!("Restarted environment labeled {}", environment_label.into());
Ok(())
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => {
environment.run();
info!("Started environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => {
environment
.state
.store(State::Running, std::sync::atomic::Ordering::SeqCst);
let (lock, pausevar) = &*environment.pausevar;
let _guard = lock.lock().unwrap();
pausevar.notify_all();
info!("Restarted environment labeled {}", environment_label.into());
Ok(())
}
State::Running => Err(ManagerError::EnvironmentAlreadyRunning {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Running => Err(ManagerError::EnvironmentAlreadyRunning {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down Expand Up @@ -259,25 +265,26 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::Relaxed)
{
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Paused, std::sync::atomic::Ordering::Relaxed);
info!("Paused environment labeled {}", environment_label.into());
Ok(())
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Paused, std::sync::atomic::Ordering::SeqCst);
info!("Paused environment labeled {}", environment_label.into());
Ok(())
}
State::Paused => Err(ManagerError::EnvironmentAlreadyPaused {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Paused => Err(ManagerError::EnvironmentAlreadyPaused {
label: environment_label.into(),
}),
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down Expand Up @@ -325,51 +332,52 @@ impl Manager {
environment_label: S,
) -> Result<(), ManagerError> {
match self.environments.get_mut(&environment_label.clone().into()) {
Some(environment) => match environment.state.load(std::sync::atomic::Ordering::Relaxed)
{
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::Relaxed);
match environment.handle.take() {
Some(handle) => {
if let Err(_) = handle.join() {
return Err(ManagerError::ThreadPanic);
Some(environment) => {
match environment.state.load(std::sync::atomic::Ordering::SeqCst) {
State::Initialization => Err(ManagerError::EnvironmentNotRunning {
label: environment_label.into(),
}),
State::Running => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if handle.join().is_err() {
return Err(ManagerError::ThreadPanic);
}
}
None => return Err(ManagerError::NoHandleAvailable),
}
None => return Err(ManagerError::NoHandleAvailable),
warn!(
"Stopped running environment labeled {}",
environment_label.into()
);
Ok(())
}
warn!(
"Stopped running environment labeled {}",
environment_label.into()
);
Ok(())
}
State::Paused => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::Relaxed);
match environment.handle.take() {
Some(handle) => {
if let Err(_) = handle.join() {
return Err(ManagerError::ThreadPanic);
State::Paused => {
environment
.state
.store(State::Stopped, std::sync::atomic::Ordering::SeqCst);
match environment.handle.take() {
Some(handle) => {
if handle.join().is_err() {
return Err(ManagerError::ThreadPanic);
}
}
None => return Err(ManagerError::NoHandleAvailable),
}
None => return Err(ManagerError::NoHandleAvailable),
warn!(
"Stopped paused environment labeled {}",
environment_label.into()
);
Ok(())
}
warn!(
"Stopped paused environment labeled {}",
environment_label.into()
);
Ok(())
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
}
State::Stopped => Err(ManagerError::EnvironmentStopped {
label: environment_label.into(),
}),
},
}
None => Err(ManagerError::EnvironmentDoesNotExist {
label: environment_label.into(),
}),
Expand Down
Loading

0 comments on commit 0c4aecb

Please # to comment.