1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import com.google.common.base.Optional;
6 import com.google.common.collect.Lists;
7 import com.typesafe.config.ConfigFactory;
8 import java.io.BufferedReader;
9 import java.io.InputStreamReader;
10 import java.util.HashMap;
11 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14 import org.opendaylight.controller.cluster.example.messages.PrintRole;
15 import org.opendaylight.controller.cluster.example.messages.PrintState;
16 import org.opendaylight.controller.cluster.raft.ConfigParams;
19 * This is a test driver for testing akka-raft implementation
20 * Its uses ExampleActors and threads to push content(key-vals) to these actors
21 * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
22 * a thread and starts push logs to the actor its assigned to.
24 public class TestDriver {
27 private static Map<String, String> allPeers = new HashMap<>();
28 private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
29 private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
30 private static LogGenerator logGenerator = new LogGenerator();
31 private int nameCounter = 0;
32 private static ConfigParams configParams = new ExampleConfigParamsImpl();
34 private static ActorSystem actorSystem;
35 private static ActorSystem listenerActorSystem;
38 * Create nodes, add clients and start logging.
43 * reinstateNode:{nodeName}
45 * addClientsToNode:{nodeName, num}
48 * startLoggingForClient:{nodeName}
49 * stopLoggingForClient:{nodeName}
53 * Note: when run on IDE and on debug log level, the debug logs in
54 * AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
55 * Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
60 public static void main(String[] args) throws Exception {
62 actorSystem = ActorSystem.create("raft-test", ConfigFactory
63 .load().getConfig("raft-test"));
65 listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
66 .load().getConfig("raft-test-listener"));
68 TestDriver td = new TestDriver();
70 System.out.println("Enter command (type bye to exit):");
73 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
75 String command = br.readLine();
76 if (command.startsWith("bye")) {
79 } else if (command.startsWith("createNodes")) {
80 String[] arr = command.split(":");
81 int n = Integer.parseInt(arr[1]);
84 } else if (command.startsWith("addClients")) {
85 String[] arr = command.split(":");
86 int n = Integer.parseInt(arr[1]);
89 } else if (command.startsWith("addClientsToNode")) {
90 String[] arr = command.split(":");
91 String nodeName = arr[1];
92 int n = Integer.parseInt(arr[1]);
93 td.addClientsToNode(nodeName, n);
95 } else if (command.startsWith("stopNode")) {
96 String[] arr = command.split(":");
99 } else if (command.startsWith("reinstateNode")) {
100 String[] arr = command.split(":");
101 td.reinstateNode(arr[1]);
103 } else if (command.startsWith("startLogging")) {
104 td.startAllLogging();
106 } else if (command.startsWith("startLoggingForClient")) {
107 String[] arr = command.split(":");
108 td.startLoggingForClient(clientActorRefs.get(arr[1]));
110 } else if (command.startsWith("stopLogging")) {
113 } else if (command.startsWith("stopLoggingForClient")) {
114 String[] arr = command.split(":");
115 td.stopLoggingForClient(clientActorRefs.get(arr[1]));
117 } else if (command.startsWith("printState")) {
119 } else if (command.startsWith("printNodes")) {
122 System.out.println("Invalid command:" + command);
128 // create the listener using a separate actor system for each example actor
129 private void createClusterRoleChangeListener(List<String> memberIds) {
130 System.out.println("memberIds="+memberIds);
131 for (String memberId : memberIds) {
132 ActorRef listenerActor = listenerActorSystem.actorOf(
133 ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
134 System.out.println("Role Change Listener created:" + listenerActor.path().toString());
138 public static ActorRef createExampleActor(String name) {
139 return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
140 Optional.of(configParams)), name);
143 public void createNodes(int num) {
144 for (int i=0; i < num; i++) {
145 nameCounter = nameCounter + 1;
146 allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
149 for (String s : allPeers.keySet()) {
150 ActorRef exampleActor = createExampleActor(s);
151 actorRefs.put(s, exampleActor);
152 System.out.println("Created node:"+s);
156 createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
159 // add num clients to all nodes in the system
160 public void addClients(int num) {
161 for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
162 for (int i=0; i < num; i++) {
163 String clientName = "client-" + i + "-" + actorRefEntry.getKey();
164 ActorRef clientActor = actorSystem.actorOf(
165 ClientActor.props(actorRefEntry.getValue()), clientName);
166 clientActorRefs.put(clientName, clientActor);
167 System.out.println("Created client-node:" + clientName);
172 // add num clients to a node
173 public void addClientsToNode(String actorName, int num) {
174 ActorRef actorRef = actorRefs.get(actorName);
175 for (int i=0; i < num; i++) {
176 String clientName = "client-" + i + "-" + actorName;
177 clientActorRefs.put(clientName,
178 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
179 System.out.println("Added client-node:" + clientName);
183 public void stopNode(String actorName) {
184 ActorRef actorRef = actorRefs.get(actorName);
186 for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
187 if (entry.getKey().endsWith(actorName)) {
188 actorSystem.stop(entry.getValue());
192 actorSystem.stop(actorRef);
193 actorRefs.remove(actorName);
194 allPeers.remove(actorName);
197 public void reinstateNode(String actorName) {
198 String address = "akka://default/user/"+actorName;
199 allPeers.put(actorName, address);
201 ActorRef exampleActor = createExampleActor(actorName);
202 actorRefs.put(actorName, exampleActor);
204 addClientsToNode(actorName, 1);
207 public void startAllLogging() {
208 if(!clientActorRefs.isEmpty()) {
209 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
210 logGenerator.startLoggingForClient(client.getValue());
211 System.out.println("Started logging for client:"+client.getKey());
214 System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
219 public void startLoggingForClient(ActorRef client) {
220 logGenerator.startLoggingForClient(client);
223 public void stopAllLogging() {
224 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
225 logGenerator.stopLoggingForClient(client.getValue());
229 public void stopLoggingForClient(ActorRef client) {
230 logGenerator.stopLoggingForClient(client);
233 public void printState() {
234 for (ActorRef ref : actorRefs.values()) {
235 ref.tell(new PrintState(), null);
239 public void printNodes() {
240 for (ActorRef ref : actorRefs.values()) {
241 ref.tell(new PrintRole(), null);
245 public ActorRef getLeader() {
250 private static Map<String, String> withoutPeer(String peerId) {
251 Map<String, String> without = new ConcurrentHashMap<>(allPeers);
252 without.remove(peerId);