1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import com.google.common.base.Optional;
6 import org.opendaylight.controller.cluster.example.messages.PrintRole;
7 import org.opendaylight.controller.cluster.example.messages.PrintState;
8 import org.opendaylight.controller.cluster.raft.ConfigParams;
9 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
11 import java.io.BufferedReader;
12 import java.io.InputStreamReader;
13 import java.util.HashMap;
15 import java.util.concurrent.ConcurrentHashMap;
18 * This is a test driver for testing akka-raft implementation
19 * Its uses ExampleActors and threads to push content(key-vals) to these actors
20 * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
21 * a thread and starts push logs to the actor its assigned to.
23 public class TestDriver {
25 private static final ActorSystem actorSystem = ActorSystem.create();
26 private static Map<String, String> allPeers = new HashMap<>();
27 private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
28 private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
29 private static LogGenerator logGenerator = new LogGenerator();
30 private int nameCounter = 0;
31 private static ConfigParams configParams = new ExampleConfigParamsImpl();
34 * Create nodes, add clients and start logging.
40 * reinstateNode:{nodeName}
42 * addClientsToNode:{nodeName, num}
45 * startLoggingForClient:{nodeName}
46 * stopLoggingForClient:{nodeName}
52 public static void main(String[] args) throws Exception {
53 TestDriver td = new TestDriver();
55 System.out.println("Enter command (type bye to exit):");
58 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
60 String command = br.readLine();
61 if (command.startsWith("bye")) {
64 } else if (command.startsWith("createNodes")) {
65 String[] arr = command.split(":");
66 int n = Integer.parseInt(arr[1]);
69 } else if (command.startsWith("addNodes")) {
70 String[] arr = command.split(":");
71 int n = Integer.parseInt(arr[1]);
74 } else if (command.startsWith("addClients")) {
75 String[] arr = command.split(":");
76 int n = Integer.parseInt(arr[1]);
79 } else if (command.startsWith("addClientsToNode")) {
80 String[] arr = command.split(":");
81 String nodeName = arr[1];
82 int n = Integer.parseInt(arr[1]);
83 td.addClientsToNode(nodeName, n);
85 } else if (command.startsWith("stopNode")) {
86 String[] arr = command.split(":");
89 } else if (command.startsWith("reinstateNode")) {
90 String[] arr = command.split(":");
91 td.reinstateNode(arr[1]);
93 } else if (command.startsWith("startLogging")) {
96 } else if (command.startsWith("startLoggingForClient")) {
97 String[] arr = command.split(":");
98 td.startLoggingForClient(clientActorRefs.get(arr[1]));
100 } else if (command.startsWith("stopLogging")) {
103 } else if (command.startsWith("stopLoggingForClient")) {
104 String[] arr = command.split(":");
105 td.stopLoggingForClient(clientActorRefs.get(arr[1]));
107 } else if (command.startsWith("printState")) {
109 } else if (command.startsWith("printNodes")) {
112 System.out.println("Invalid command:" + command);
118 public static ActorRef createExampleActor(String name) {
119 return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
120 Optional.of(configParams)), name);
123 public void createNodes(int num) {
124 for (int i=0; i < num; i++) {
125 nameCounter = nameCounter + 1;
126 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
129 for (String s : allPeers.keySet()) {
130 ActorRef exampleActor = createExampleActor(s);
131 actorRefs.put(s, exampleActor);
132 System.out.println("Created node:"+s);
137 // add new nodes , pass in the count
138 public void addNodes(int num) {
139 Map<String, String> newPeers = new HashMap<>();
140 for (int i=0; i < num; i++) {
141 nameCounter = nameCounter + 1;
142 newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
143 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
146 Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
147 for (Map.Entry<String, String> entry : newPeers.entrySet()) {
148 ActorRef exampleActor = createExampleActor(entry.getKey());
149 newActorRefs.put(entry.getKey(), exampleActor);
151 //now also add these new nodes as peers from the previous nodes
152 for (ActorRef actor : actorRefs.values()) {
153 actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
156 System.out.println("Added node:" + entry);
159 actorRefs.putAll(newActorRefs);
163 // add num clients to all nodes in the system
164 public void addClients(int num) {
165 for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
166 for (int i=0; i < num; i++) {
167 String clientName = "client-" + i + "-" + actorRefEntry.getKey();
168 ActorRef clientActor = actorSystem.actorOf(
169 ClientActor.props(actorRefEntry.getValue()), clientName);
170 clientActorRefs.put(clientName, clientActor);
171 System.out.println("Created client-node:" + clientName);
176 // add num clients to a node
177 public void addClientsToNode(String actorName, int num) {
178 ActorRef actorRef = actorRefs.get(actorName);
179 for (int i=0; i < num; i++) {
180 String clientName = "client-" + i + "-" + actorName;
181 clientActorRefs.put(clientName,
182 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
183 System.out.println("Added client-node:" + clientName);
187 public void stopNode(String actorName) {
188 ActorRef actorRef = actorRefs.get(actorName);
190 for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
191 if (entry.getKey().endsWith(actorName)) {
192 actorSystem.stop(entry.getValue());
196 actorSystem.stop(actorRef);
197 actorRefs.remove(actorName);
198 allPeers.remove(actorName);
201 public void reinstateNode(String actorName) {
202 String address = "akka://default/user/"+actorName;
203 allPeers.put(actorName, address);
205 ActorRef exampleActor = createExampleActor(actorName);
206 actorRefs.put(actorName, exampleActor);
208 addClientsToNode(actorName, 1);
211 public void startAllLogging() {
212 if(!clientActorRefs.isEmpty()) {
213 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
214 logGenerator.startLoggingForClient(client.getValue());
215 System.out.println("Started logging for client:"+client.getKey());
218 System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
223 public void startLoggingForClient(ActorRef client) {
224 logGenerator.startLoggingForClient(client);
227 public void stopAllLogging() {
228 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
229 logGenerator.stopLoggingForClient(client.getValue());
233 public void stopLoggingForClient(ActorRef client) {
234 logGenerator.stopLoggingForClient(client);
237 public void printState() {
238 for (ActorRef ref : actorRefs.values()) {
239 ref.tell(new PrintState(), null);
243 public void printNodes() {
244 for (ActorRef ref : actorRefs.values()) {
245 ref.tell(new PrintRole(), null);
249 public ActorRef getLeader() {
254 private static Map<String, String> withoutPeer(String peerId) {
255 Map<String, String> without = new ConcurrentHashMap<>(allPeers);
256 without.remove(peerId);