Skip to content

Commit

Permalink
allow to choose scheduling strategy in the command-line
Browse files Browse the repository at this point in the history
* introduce parameter -s/--scheduler
* introduce factory pattern to transport the choice to the master actor
* rename LoadAwareSchedulingStrategy to ReactiveSchedulingStrategy
  • Loading branch information
Sebastian Kruse committed Jul 12, 2017
1 parent 60b781b commit fd56bfa
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 25 deletions.
24 changes: 22 additions & 2 deletions akka-tutorial/src/main/java/de/hpi/akka_tutorial/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import de.hpi.akka_tutorial.remote.Calculator;
import de.hpi.akka_tutorial.remote.actors.scheduling.ReactiveSchedulingStrategy;
import de.hpi.akka_tutorial.remote.actors.scheduling.RoundRobinSchedulingStrategy;
import de.hpi.akka_tutorial.remote.actors.scheduling.SchedulingStrategy;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -58,8 +61,19 @@ public static void main(String[] args) {
*
* @param masterCommand defines the parameters of the master
*/
private static void startMaster(MasterCommand masterCommand) {
Calculator.runMaster(masterCommand.host, masterCommand.port, masterCommand.numLocalWorkers);
private static void startMaster(MasterCommand masterCommand) throws ParameterException {
SchedulingStrategy.Factory schedulingStrategyFactory;
switch (masterCommand.schedulingStrategy) {
case "round-robin":
schedulingStrategyFactory = new RoundRobinSchedulingStrategy.Factory();
break;
case "reactive":
schedulingStrategyFactory = new ReactiveSchedulingStrategy.Factory();
break;
default:
throw new ParameterException(String.format("Unknown scheduling strategy: %s", masterCommand.schedulingStrategy));
}
Calculator.runMaster(masterCommand.host, masterCommand.port, schedulingStrategyFactory, masterCommand.numLocalWorkers);
}

/**
Expand Down Expand Up @@ -89,6 +103,12 @@ int getDefaultPort() {
*/
@Parameter(names = {"-w", "--workers"}, description = "number of workers to start locally")
int numLocalWorkers = 0;

/**
* Defines the scheduling strategy to be used in the master.
*/
@Parameter(names = {"-s", "--scheduler"}, description = "a scheduling strategy (round-robin or reactive)")
String schedulingStrategy = "round-robin";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import akka.actor.PoisonPill;
import com.typesafe.config.Config;
import de.hpi.akka_tutorial.remote.actors.*;
import de.hpi.akka_tutorial.remote.actors.scheduling.SchedulingStrategy;
import de.hpi.akka_tutorial.util.AkkaUtils;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
Expand All @@ -18,7 +19,7 @@ public class Calculator {
private static final String DEFAULT_MASTER_SYSTEM_NAME = "MasterActorSystem";
private static final String DEFAULT_SLAVE_SYSTEM_NAME = "SlaveActorSystem";

public static void runMaster(String host, int port, int numLocalWorkers) {
public static void runMaster(String host, int port, SchedulingStrategy.Factory schedulingStrategyFactory, int numLocalWorkers) {

// Create the ActorSystem
final Config config = AkkaUtils.createRemoteAkkaConfig(host, port);
Expand All @@ -31,7 +32,7 @@ public static void runMaster(String host, int port, int numLocalWorkers) {
final ActorRef listener = actorSystem.actorOf(Listener.props(), Listener.DEFAULT_NAME);

// Create the Master
final ActorRef master = actorSystem.actorOf(Master.props(listener, numLocalWorkers), Master.DEFAULT_NAME);
final ActorRef master = actorSystem.actorOf(Master.props(listener, schedulingStrategyFactory, numLocalWorkers), Master.DEFAULT_NAME);

// Create the Shepherd
final ActorRef shepherd = actorSystem.actorOf(Shepherd.props(master), Shepherd.DEFAULT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import akka.actor.Terminated;
import akka.japi.pf.DeciderBuilder;
import akka.remote.RemoteScope;
import de.hpi.akka_tutorial.remote.actors.scheduling.LoadAwareSchedulingStrategy;
import de.hpi.akka_tutorial.remote.actors.scheduling.RoundRobinSchedulingStrategy;
import de.hpi.akka_tutorial.remote.actors.scheduling.SchedulingStrategy;
import scala.concurrent.duration.Duration;

Expand All @@ -35,8 +33,8 @@ public class Master extends AbstractLoggingActor {
*
* @return the {@link Props}
*/
public static Props props(final ActorRef listener, final int numLocalWorkers) {
return Props.create(Master.class, () -> new Master(listener, numLocalWorkers));
public static Props props(final ActorRef listener, SchedulingStrategy.Factory schedulingStrategyFactory, final int numLocalWorkers) {
return Props.create(Master.class, () -> new Master(listener, schedulingStrategyFactory, numLocalWorkers));
}

/**
Expand Down Expand Up @@ -126,8 +124,7 @@ public RemoteSystemMessage(Address remoteAddress) {
private final ActorRef listener;

// The scheduling strategy that splits range messages into smaller tasks and distributes these to the workers
private final SchedulingStrategy schedulingStrategy = new LoadAwareSchedulingStrategy(this.getSelf());
// private final SchedulingStrategy schedulingStrategy = new RoundRobinSchedulingStrategy(this.getSelf());
private final SchedulingStrategy schedulingStrategy;

// A helper variable to assign unique IDs to each range query
private int nextQueryId = 0;
Expand All @@ -137,14 +134,17 @@ public RemoteSystemMessage(Address remoteAddress) {

/**
* Construct a new {@link Master} object.
*
* @param listener a reference to an {@link Listener} actor to send results to
* @param schedulingStrategyFactory defines which {@link SchedulingStrategy} to use
* @param numLocalWorkers number of workers that this master should start locally
*/
public Master(final ActorRef listener, int numLocalWorkers) {
public Master(final ActorRef listener, SchedulingStrategy.Factory schedulingStrategyFactory, int numLocalWorkers) {

// Save the reference to the Listener actor
this.listener = listener;

// Create a scheduling strategy.
this.schedulingStrategy = schedulingStrategyFactory.create(this.getSelf());

// Start the specified number of local workers
for (int i = 0; i < numLocalWorkers; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@
import java.util.*;
import java.util.stream.Collectors;

public class LoadAwareSchedulingStrategy implements SchedulingStrategy {
public class ReactiveSchedulingStrategy implements SchedulingStrategy {

/**
* {@link SchedulingStrategy.Factory} implementation for the {@link ReactiveSchedulingStrategy}.
*/
public static class Factory implements SchedulingStrategy.Factory {

@Override
public ReactiveSchedulingStrategy create(ActorRef master) {
return new ReactiveSchedulingStrategy(master);
}

}

/**
* This class supervises the state of a range query for primes.
Expand Down Expand Up @@ -105,7 +117,7 @@ boolean isComplete() {
// A reference to the actor in whose name we send messages
private final ActorRef master;

public LoadAwareSchedulingStrategy(ActorRef master) {
public ReactiveSchedulingStrategy(ActorRef master) {
this.master = master;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@

public class RoundRobinSchedulingStrategy implements SchedulingStrategy {

/**
* {@link SchedulingStrategy.Factory} implementation for the {@link RoundRobinSchedulingStrategy}.
*/
public static class Factory implements SchedulingStrategy.Factory {

@Override
public SchedulingStrategy create(ActorRef master) {
return new RoundRobinSchedulingStrategy(master);
}

}

// A round robin router for our workers
private Router workerRouter = new Router(new RoundRobinRoutingLogic());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,58 @@
import de.hpi.akka_tutorial.remote.actors.Worker;

public interface SchedulingStrategy {


/**
* A factory for a {@link SchedulingStrategy}.
*/
interface Factory {

/**
* Create a new {@link SchedulingStrategy}.
*
* @param master that will employ the new instance
* @return the new {@link SchedulingStrategy}
*/
SchedulingStrategy create(ActorRef master);

}

/**
* Schedule a new prime checking task in the given range.
*
* @param taskId the id of the task that is to be split and scheduled
* @param startNumber first number of the range
* @param endNumber last number of the range
*/
public void schedule(final int taskId, final long startNumber, final long endNumber);
void schedule(final int taskId, final long startNumber, final long endNumber);

/**
* Notify the completion of a worker's task.
*
* @param taskId the id of the task this worker was working on
* @param worker the reference to the worker who finished the task
*/
public void finished(final int taskId, final ActorRef worker);
void finished(final int taskId, final ActorRef worker);

/**
* Check if there are still any pending tasks.
*
* @return {@code true} if tasks are still pending
*/
public boolean hasTasksInProgress();
boolean hasTasksInProgress();

/**
* Add a new {@link Worker} actor.
*
* @param worker the worker actor to add
*/
public void addWorker(final ActorRef worker);
void addWorker(final ActorRef worker);

/**
* Remove a {@link Worker} actor.
*
* @param worker the worker actor to remove
*/
public void removeWorker(final ActorRef worker);
void removeWorker(final ActorRef worker);

}

0 comments on commit fd56bfa

Please # to comment.