1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import org.opendaylight.controller.cluster.example.messages.PrintRole;
6 import org.opendaylight.controller.cluster.example.messages.PrintState;
7 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
8 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
10 import java.io.BufferedReader;
11 import java.io.InputStreamReader;
12 import java.util.HashMap;
14 import java.util.concurrent.ConcurrentHashMap;
17 * This is a test driver for testing akka-raft implementation
18 * Its uses ExampleActors and threads to push content(key-vals) to these actors
19 * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
20 * a thread and starts push logs to the actor its assigned to.
22 public class TestDriver {
24 private static final ActorSystem actorSystem = ActorSystem.create();
25 private static Map<String, String> allPeers = new HashMap<>();
26 private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
27 private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
28 private static LogGenerator logGenerator = new LogGenerator();
29 private int nameCounter = 0;
32 * Create nodes, add clients and start logging.
38 * reinstateNode:{nodeName}
40 * addClientsToNode:{nodeName, num}
43 * startLoggingForClient:{nodeName}
44 * stopLoggingForClient:{nodeName}
50 public static void main(String[] args) throws Exception {
51 TestDriver td = new TestDriver();
53 System.out.println("Enter command (type bye to exit):");
56 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
58 String command = br.readLine();
59 if (command.startsWith("bye")) {
62 } else if (command.startsWith("createNodes")) {
63 String[] arr = command.split(":");
64 int n = Integer.parseInt(arr[1]);
67 } else if (command.startsWith("addNodes")) {
68 String[] arr = command.split(":");
69 int n = Integer.parseInt(arr[1]);
72 } else if (command.startsWith("addClients")) {
73 String[] arr = command.split(":");
74 int n = Integer.parseInt(arr[1]);
77 } else if (command.startsWith("addClientsToNode")) {
78 String[] arr = command.split(":");
79 String nodeName = arr[1];
80 int n = Integer.parseInt(arr[1]);
81 td.addClientsToNode(nodeName, n);
83 } else if (command.startsWith("stopNode")) {
84 String[] arr = command.split(":");
87 } else if (command.startsWith("reinstateNode")) {
88 String[] arr = command.split(":");
89 td.reinstateNode(arr[1]);
91 } else if (command.startsWith("startLogging")) {
94 } else if (command.startsWith("startLoggingForClient")) {
95 String[] arr = command.split(":");
96 td.startLoggingForClient(clientActorRefs.get(arr[1]));
98 } else if (command.startsWith("stopLogging")) {
101 } else if (command.startsWith("stopLoggingForClient")) {
102 String[] arr = command.split(":");
103 td.stopLoggingForClient(clientActorRefs.get(arr[1]));
105 } else if (command.startsWith("printState")) {
107 } else if (command.startsWith("printNodes")) {
114 public void createNodes(int num) {
115 for (int i=0; i < num; i++) {
116 nameCounter = nameCounter + 1;
117 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
120 for (String s : allPeers.keySet()) {
121 ActorRef exampleActor = actorSystem.actorOf(
122 ExampleActor.props(s, withoutPeer(s)), s);
123 actorRefs.put(s, exampleActor);
124 System.out.println("Created node:"+s);
129 // add new nodes , pass in the count
130 public void addNodes(int num) {
131 Map<String, String> newPeers = new HashMap<>();
132 for (int i=0; i < num; i++) {
133 nameCounter = nameCounter + 1;
134 newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
135 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
138 Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
139 for (Map.Entry<String, String> entry : newPeers.entrySet()) {
140 ActorRef exampleActor = actorSystem.actorOf(
141 ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
142 newActorRefs.put(entry.getKey(), exampleActor);
144 //now also add these new nodes as peers from the previous nodes
145 for (ActorRef actor : actorRefs.values()) {
146 actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
149 System.out.println("Added node:" + entry);
152 actorRefs.putAll(newActorRefs);
156 // add num clients to all nodes in the system
157 public void addClients(int num) {
158 for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
159 for (int i=0; i < num; i++) {
160 String clientName = "client-" + i + "-" + actorRefEntry.getKey();
161 ActorRef clientActor = actorSystem.actorOf(
162 ClientActor.props(actorRefEntry.getValue()), clientName);
163 clientActorRefs.put(clientName, clientActor);
164 System.out.println("Created client-node:" + clientName);
169 // add num clients to a node
170 public void addClientsToNode(String actorName, int num) {
171 ActorRef actorRef = actorRefs.get(actorName);
172 for (int i=0; i < num; i++) {
173 String clientName = "client-" + i + "-" + actorName;
174 clientActorRefs.put(clientName,
175 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
176 System.out.println("Added client-node:" + clientName);
180 public void stopNode(String actorName) {
181 ActorRef actorRef = actorRefs.get(actorName);
183 for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
184 if (entry.getKey().endsWith(actorName)) {
185 actorSystem.stop(entry.getValue());
189 actorSystem.stop(actorRef);
190 actorRefs.remove(actorName);
192 for (ActorRef actor : actorRefs.values()) {
193 actor.tell(new RemoveRaftPeer(actorName), null);
196 allPeers.remove(actorName);
199 public void reinstateNode(String actorName) {
200 String address = "akka://default/user/"+actorName;
201 allPeers.put(actorName, address);
203 ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName);
205 for (ActorRef actor : actorRefs.values()) {
206 actor.tell(new AddRaftPeer(actorName, address), null);
209 actorRefs.put(actorName, exampleActor);
211 addClientsToNode(actorName, 1);
214 public void startAllLogging() {
215 if(!clientActorRefs.isEmpty()) {
216 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
217 logGenerator.startLoggingForClient(client.getValue());
218 System.out.println("Started logging for client:"+client.getKey());
221 System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
226 public void startLoggingForClient(ActorRef client) {
227 logGenerator.startLoggingForClient(client);
230 public void stopAllLogging() {
231 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
232 logGenerator.stopLoggingForClient(client.getValue());
236 public void stopLoggingForClient(ActorRef client) {
237 logGenerator.stopLoggingForClient(client);
240 public void printState() {
241 for (ActorRef ref : actorRefs.values()) {
242 ref.tell(new PrintState(), null);
246 public void printNodes() {
247 for (ActorRef ref : actorRefs.values()) {
248 ref.tell(new PrintRole(), null);
252 public ActorRef getLeader() {
257 private static Map<String, String> withoutPeer(String peerId) {
258 Map<String, String> without = new ConcurrentHashMap<>(allPeers);
259 without.remove(peerId);