import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
/**
* This is a test driver for testing akka-raft implementation
*/
public class TestDriver {
- private static final ActorSystem actorSystem = ActorSystem.create();
+
private static Map<String, String> allPeers = new HashMap<>();
private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
private int nameCounter = 0;
private static ConfigParams configParams = new ExampleConfigParamsImpl();
+ private static ActorSystem actorSystem;
+ private static ActorSystem listenerActorSystem;
+
/**
* Create nodes, add clients and start logging.
* Commands
* bye
* createNodes:{num}
- * addNodes:{num}
* stopNode:{nodeName}
* reinstateNode:{nodeName}
* addClients:{num}
* stopLoggingForClient:{nodeName}
* printNodes
* printState
+ *
+ * Note: when run on IDE and on debug log level, the debug logs in
+ * AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
+ * Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
+ *
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
+
+ actorSystem = ActorSystem.create("raft-test", ConfigFactory
+ .load().getConfig("raft-test"));
+
+ listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+ .load().getConfig("raft-test-listener"));
+
TestDriver td = new TestDriver();
System.out.println("Enter command (type bye to exit):");
int n = Integer.parseInt(arr[1]);
td.createNodes(n);
- } else if (command.startsWith("addNodes")) {
- String[] arr = command.split(":");
- int n = Integer.parseInt(arr[1]);
- td.addNodes(n);
-
} else if (command.startsWith("addClients")) {
String[] arr = command.split(":");
int n = Integer.parseInt(arr[1]);
}
}
+ // create the listener using a separate actor system for each example actor
+ private void createClusterRoleChangeListener(List<String> memberIds) {
+ System.out.println("memberIds="+memberIds);
+ for (String memberId : memberIds) {
+ ActorRef listenerActor = listenerActorSystem.actorOf(
+ ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+ System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+ }
+ }
+
public static ActorRef createExampleActor(String name) {
return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
Optional.of(configParams)), name);
public void createNodes(int num) {
for (int i=0; i < num; i++) {
nameCounter = nameCounter + 1;
- allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+ allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
}
for (String s : allPeers.keySet()) {
System.out.println("Created node:"+s);
}
- }
-
- // add new nodes , pass in the count
- public void addNodes(int num) {
- Map<String, String> newPeers = new HashMap<>();
- for (int i=0; i < num; i++) {
- nameCounter = nameCounter + 1;
- newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
- allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
- }
- Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
- for (Map.Entry<String, String> entry : newPeers.entrySet()) {
- ActorRef exampleActor = createExampleActor(entry.getKey());
- newActorRefs.put(entry.getKey(), exampleActor);
-
- //now also add these new nodes as peers from the previous nodes
- for (ActorRef actor : actorRefs.values()) {
- actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
- }
-
- System.out.println("Added node:" + entry);
- }
-
- actorRefs.putAll(newActorRefs);
+ createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
}
-
// add num clients to all nodes in the system
public void addClients(int num) {
for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
actorSystem.stop(actorRef);
actorRefs.remove(actorName);
-
- for (ActorRef actor : actorRefs.values()) {
- actor.tell(new RemoveRaftPeer(actorName), null);
- }
-
allPeers.remove(actorName);
}
allPeers.put(actorName, address);
ActorRef exampleActor = createExampleActor(actorName);
-
- for (ActorRef actor : actorRefs.values()) {
- actor.tell(new AddRaftPeer(actorName, address), null);
- }
-
actorRefs.put(actorName, exampleActor);
addClientsToNode(actorName, 1);