1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import org.opendaylight.controller.cluster.example.messages.PrintRole;
6 import org.opendaylight.controller.cluster.example.messages.PrintState;
7 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
8 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
10 import java.io.BufferedReader;
11 import java.io.InputStreamReader;
12 import java.util.HashMap;
14 import java.util.Random;
15 import java.util.concurrent.ConcurrentHashMap;
18 * This is a test driver for testing akka-raft implementation
19 * Its uses ExampleActors and threads to push content(key-vals) to these actors
20 * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
21 * a thread and starts push logs to the actor its assignged to.
23 public class TestDriver {
25 private static final ActorSystem actorSystem = ActorSystem.create();
26 private static Map<String, String> allPeers = new HashMap<>();
27 private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
28 private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
29 private static LogGenerator logGenerator = new LogGenerator();;
32 * Create nodes, add clients and start logging.
39 * addClientsToNode:{nodeName, num}
42 * startLoggingForClient:{nodeName}
43 * stopLoggingForClient:{nodeName}
49 public static void main(String[] args) throws Exception {
50 TestDriver td = new TestDriver();
52 System.out.println("Enter command (type bye to exit):");
55 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
57 String command = br.readLine();
58 if (command.startsWith("bye")) {
61 } else if (command.startsWith("createNodes")) {
62 String[] arr = command.split(":");
63 int n = Integer.parseInt(arr[1]);
66 } else if (command.startsWith("addNodes")) {
67 String[] arr = command.split(":");
68 int n = Integer.parseInt(arr[1]);
71 } else if (command.startsWith("addClients")) {
72 String[] arr = command.split(":");
73 int n = Integer.parseInt(arr[1]);
76 } else if (command.startsWith("addClientsToNode")) {
77 String[] arr = command.split(":");
78 String nodeName = arr[1];
79 int n = Integer.parseInt(arr[1]);
80 td.addClientsToNode(nodeName, n);
82 } else if (command.startsWith("stopNode")) {
83 String[] arr = command.split(":");
86 } else if (command.startsWith("startLogging")) {
89 } else if (command.startsWith("startLoggingForClient")) {
90 String[] arr = command.split(":");
91 td.startLoggingForClient(clientActorRefs.get(arr[1]));
93 } else if (command.startsWith("stopLogging")) {
96 } else if (command.startsWith("stopLoggingForClient")) {
97 String[] arr = command.split(":");
98 td.stopLoggingForClient(clientActorRefs.get(arr[1]));
100 } else if (command.startsWith("printState")) {
102 } else if (command.startsWith("printNodes")) {
109 public void createNodes(int num) {
110 for (int i=0; i < num; i++) {
111 int rand = getUnusedRandom(num);
112 allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
115 for (String s : allPeers.keySet()) {
116 ActorRef exampleActor = actorSystem.actorOf(
117 ExampleActor.props(s, withoutPeer(s)), s);
118 actorRefs.put(s, exampleActor);
119 System.out.println("Created node:"+s);
124 // add new nodes , pass in the count
125 public void addNodes(int num) {
126 Map<String, String> newPeers = new HashMap<>();
127 for (int i=0; i < num; i++) {
128 int rand = getUnusedRandom(num);
129 newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
130 allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
133 Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
134 for (Map.Entry<String, String> entry : newPeers.entrySet()) {
135 ActorRef exampleActor = actorSystem.actorOf(
136 ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
137 newActorRefs.put(entry.getKey(), exampleActor);
139 //now also add these new nodes as peers from the previous nodes
140 for (ActorRef actor : actorRefs.values()) {
141 actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
144 System.out.println("Added node:" + entry);
147 actorRefs.putAll(newActorRefs);
151 // add num clients to all nodes in the system
152 public void addClients(int num) {
153 for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
154 for (int i=0; i < num; i++) {
155 String clientName = "client-" + i + "-" + actorRefEntry.getKey();
156 ActorRef clientActor = actorSystem.actorOf(
157 ClientActor.props(actorRefEntry.getValue()), clientName);
158 clientActorRefs.put(clientName, clientActor);
159 System.out.println("Created client-node:" + clientName);
164 // add num clients to a node
165 public void addClientsToNode(String actorName, int num) {
166 ActorRef actorRef = actorRefs.get(actorName);
167 for (int i=0; i < num; i++) {
168 String clientName = "client-" + i + "-" + actorRef;
169 clientActorRefs.put(clientName,
170 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
171 System.out.println("Added client-node:" + clientName);
175 public void stopNode(String actorName) {
176 ActorRef actorRef = actorRefs.get(actorName);
177 String clientName = "client-"+actorName;
178 if(clientActorRefs.containsKey(clientName)) {
179 actorSystem.stop(clientActorRefs.get(clientName));
180 clientActorRefs.remove(clientName);
182 actorSystem.stop(actorRef);
183 actorRefs.remove(actorName);
185 for (ActorRef actor : actorRefs.values()) {
186 actor.tell(new RemoveRaftPeer(actorName), null);
189 allPeers.remove(actorName);
193 public void startAllLogging() {
194 if(!clientActorRefs.isEmpty()) {
195 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
196 logGenerator.startLoggingForClient(client.getValue());
197 System.out.println("Started logging for client:"+client.getKey());
200 System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
205 public void startLoggingForClient(ActorRef client) {
206 logGenerator.startLoggingForClient(client);
209 public void stopAllLogging() {
210 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
211 logGenerator.stopLoggingForClient(client.getValue());
215 public void stopLoggingForClient(ActorRef client) {
216 logGenerator.stopLoggingForClient(client);
219 public void printState() {
220 for (ActorRef ref : actorRefs.values()) {
221 ref.tell(new PrintState(), null);
225 public void printNodes() {
226 for (ActorRef ref : actorRefs.values()) {
227 ref.tell(new PrintRole(), null);
231 public ActorRef getLeader() {
235 private int getUnusedRandom(int num) {
238 rand = (new Random()).nextInt(num * num);
239 } while (allPeers.keySet().contains("example-"+rand));
244 private static Map<String, String> withoutPeer(String peerId) {
245 Map<String, String> without = new ConcurrentHashMap<>(allPeers);
246 without.remove(peerId);