import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
-
+/**
+ * This is a test driver for testing akka-raft implementation
+ * Its uses ExampleActors and threads to push content(key-vals) to these actors
+ * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
+ * a thread and starts push logs to the actor its assigned to.
+ */
public class TestDriver {
private static final ActorSystem actorSystem = ActorSystem.create();
private static Map<String, String> allPeers = new HashMap<>();
private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
- private static LogGenerator logGenerator = new LogGenerator();;
+ private static LogGenerator logGenerator = new LogGenerator();
+ private int nameCounter = 0;
+ private static ConfigParams configParams = new ExampleConfigParamsImpl();
/**
* Create nodes, add clients and start logging.
* createNodes:{num}
* addNodes:{num}
* stopNode:{nodeName}
+ * reinstateNode:{nodeName}
* addClients:{num}
* addClientsToNode:{nodeName, num}
* startLogging
String[] arr = command.split(":");
td.stopNode(arr[1]);
+ } else if (command.startsWith("reinstateNode")) {
+ String[] arr = command.split(":");
+ td.reinstateNode(arr[1]);
+
} else if (command.startsWith("startLogging")) {
td.startAllLogging();
td.printState();
} else if (command.startsWith("printNodes")) {
td.printNodes();
+ } else {
+ System.out.println("Invalid command:" + command);
}
}
}
+ public static ActorRef createExampleActor(String name) {
+ return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
+ Optional.of(configParams)), name);
+ }
+
public void createNodes(int num) {
for (int i=0; i < num; i++) {
- int rand = getUnusedRandom(num);
- allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ nameCounter = nameCounter + 1;
+ allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
}
for (String s : allPeers.keySet()) {
- ActorRef exampleActor = actorSystem.actorOf(
- ExampleActor.props(s, withoutPeer(s)), s);
+ ActorRef exampleActor = createExampleActor(s);
actorRefs.put(s, exampleActor);
System.out.println("Created node:"+s);
public void addNodes(int num) {
Map<String, String> newPeers = new HashMap<>();
for (int i=0; i < num; i++) {
- int rand = getUnusedRandom(num);
- newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
- allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ nameCounter = nameCounter + 1;
+ newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+ allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
}
Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
for (Map.Entry<String, String> entry : newPeers.entrySet()) {
- ActorRef exampleActor = actorSystem.actorOf(
- ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
+ ActorRef exampleActor = createExampleActor(entry.getKey());
newActorRefs.put(entry.getKey(), exampleActor);
//now also add these new nodes as peers from the previous nodes
public void addClientsToNode(String actorName, int num) {
ActorRef actorRef = actorRefs.get(actorName);
for (int i=0; i < num; i++) {
- String clientName = "client-" + i + "-" + actorRef;
+ String clientName = "client-" + i + "-" + actorName;
clientActorRefs.put(clientName,
actorSystem.actorOf(ClientActor.props(actorRef), clientName));
System.out.println("Added client-node:" + clientName);
public void stopNode(String actorName) {
ActorRef actorRef = actorRefs.get(actorName);
- String clientName = "client-"+actorName;
- if(clientActorRefs.containsKey(clientName)) {
- actorSystem.stop(clientActorRefs.get(clientName));
- clientActorRefs.remove(clientName);
+
+ for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
+ if (entry.getKey().endsWith(actorName)) {
+ actorSystem.stop(entry.getValue());
+ }
}
+
actorSystem.stop(actorRef);
actorRefs.remove(actorName);
+ allPeers.remove(actorName);
+ }
- for (ActorRef actor : actorRefs.values()) {
- actor.tell(new RemoveRaftPeer(actorName), null);
- }
+ public void reinstateNode(String actorName) {
+ String address = "akka://default/user/"+actorName;
+ allPeers.put(actorName, address);
- allPeers.remove(actorName);
+ ActorRef exampleActor = createExampleActor(actorName);
+ actorRefs.put(actorName, exampleActor);
+ addClientsToNode(actorName, 1);
}
public void startAllLogging() {
return null;
}
- private int getUnusedRandom(int num) {
- int rand = -1;
- do {
- rand = (new Random()).nextInt(num * num);
- } while (allPeers.keySet().contains("example-"+rand));
-
- return rand;
- }
private static Map<String, String> withoutPeer(String peerId) {
Map<String, String> without = new ConcurrentHashMap<>(allPeers);