1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import com.google.common.base.Optional;
6 import org.opendaylight.controller.cluster.example.messages.PrintRole;
7 import org.opendaylight.controller.cluster.example.messages.PrintState;
8 import org.opendaylight.controller.cluster.raft.ConfigParams;
9 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
10 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
12 import java.io.BufferedReader;
13 import java.io.InputStreamReader;
14 import java.util.HashMap;
16 import java.util.concurrent.ConcurrentHashMap;
19 * This is a test driver for testing akka-raft implementation
20 * Its uses ExampleActors and threads to push content(key-vals) to these actors
21 * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
22 * a thread and starts push logs to the actor its assigned to.
24 public class TestDriver {
26 private static final ActorSystem actorSystem = ActorSystem.create();
27 private static Map<String, String> allPeers = new HashMap<>();
28 private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
29 private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
30 private static LogGenerator logGenerator = new LogGenerator();
31 private int nameCounter = 0;
32 private static ConfigParams configParams = new ExampleConfigParamsImpl();
35 * Create nodes, add clients and start logging.
41 * reinstateNode:{nodeName}
43 * addClientsToNode:{nodeName, num}
46 * startLoggingForClient:{nodeName}
47 * stopLoggingForClient:{nodeName}
53 public static void main(String[] args) throws Exception {
54 TestDriver td = new TestDriver();
56 System.out.println("Enter command (type bye to exit):");
59 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
61 String command = br.readLine();
62 if (command.startsWith("bye")) {
65 } else if (command.startsWith("createNodes")) {
66 String[] arr = command.split(":");
67 int n = Integer.parseInt(arr[1]);
70 } else if (command.startsWith("addNodes")) {
71 String[] arr = command.split(":");
72 int n = Integer.parseInt(arr[1]);
75 } else if (command.startsWith("addClients")) {
76 String[] arr = command.split(":");
77 int n = Integer.parseInt(arr[1]);
80 } else if (command.startsWith("addClientsToNode")) {
81 String[] arr = command.split(":");
82 String nodeName = arr[1];
83 int n = Integer.parseInt(arr[1]);
84 td.addClientsToNode(nodeName, n);
86 } else if (command.startsWith("stopNode")) {
87 String[] arr = command.split(":");
90 } else if (command.startsWith("reinstateNode")) {
91 String[] arr = command.split(":");
92 td.reinstateNode(arr[1]);
94 } else if (command.startsWith("startLogging")) {
97 } else if (command.startsWith("startLoggingForClient")) {
98 String[] arr = command.split(":");
99 td.startLoggingForClient(clientActorRefs.get(arr[1]));
101 } else if (command.startsWith("stopLogging")) {
104 } else if (command.startsWith("stopLoggingForClient")) {
105 String[] arr = command.split(":");
106 td.stopLoggingForClient(clientActorRefs.get(arr[1]));
108 } else if (command.startsWith("printState")) {
110 } else if (command.startsWith("printNodes")) {
113 System.out.println("Invalid command:" + command);
119 public static ActorRef createExampleActor(String name) {
120 return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
121 Optional.of(configParams)), name);
124 public void createNodes(int num) {
125 for (int i=0; i < num; i++) {
126 nameCounter = nameCounter + 1;
127 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
130 for (String s : allPeers.keySet()) {
131 ActorRef exampleActor = createExampleActor(s);
132 actorRefs.put(s, exampleActor);
133 System.out.println("Created node:"+s);
138 // add new nodes , pass in the count
139 public void addNodes(int num) {
140 Map<String, String> newPeers = new HashMap<>();
141 for (int i=0; i < num; i++) {
142 nameCounter = nameCounter + 1;
143 newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
144 allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
147 Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
148 for (Map.Entry<String, String> entry : newPeers.entrySet()) {
149 ActorRef exampleActor = createExampleActor(entry.getKey());
150 newActorRefs.put(entry.getKey(), exampleActor);
152 //now also add these new nodes as peers from the previous nodes
153 for (ActorRef actor : actorRefs.values()) {
154 actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
157 System.out.println("Added node:" + entry);
160 actorRefs.putAll(newActorRefs);
164 // add num clients to all nodes in the system
165 public void addClients(int num) {
166 for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
167 for (int i=0; i < num; i++) {
168 String clientName = "client-" + i + "-" + actorRefEntry.getKey();
169 ActorRef clientActor = actorSystem.actorOf(
170 ClientActor.props(actorRefEntry.getValue()), clientName);
171 clientActorRefs.put(clientName, clientActor);
172 System.out.println("Created client-node:" + clientName);
177 // add num clients to a node
178 public void addClientsToNode(String actorName, int num) {
179 ActorRef actorRef = actorRefs.get(actorName);
180 for (int i=0; i < num; i++) {
181 String clientName = "client-" + i + "-" + actorName;
182 clientActorRefs.put(clientName,
183 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
184 System.out.println("Added client-node:" + clientName);
188 public void stopNode(String actorName) {
189 ActorRef actorRef = actorRefs.get(actorName);
191 for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
192 if (entry.getKey().endsWith(actorName)) {
193 actorSystem.stop(entry.getValue());
197 actorSystem.stop(actorRef);
198 actorRefs.remove(actorName);
200 for (ActorRef actor : actorRefs.values()) {
201 actor.tell(new RemoveRaftPeer(actorName), null);
204 allPeers.remove(actorName);
207 public void reinstateNode(String actorName) {
208 String address = "akka://default/user/"+actorName;
209 allPeers.put(actorName, address);
211 ActorRef exampleActor = createExampleActor(actorName);
213 for (ActorRef actor : actorRefs.values()) {
214 actor.tell(new AddRaftPeer(actorName, address), null);
217 actorRefs.put(actorName, exampleActor);
219 addClientsToNode(actorName, 1);
222 public void startAllLogging() {
223 if(!clientActorRefs.isEmpty()) {
224 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
225 logGenerator.startLoggingForClient(client.getValue());
226 System.out.println("Started logging for client:"+client.getKey());
229 System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
234 public void startLoggingForClient(ActorRef client) {
235 logGenerator.startLoggingForClient(client);
238 public void stopAllLogging() {
239 for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
240 logGenerator.stopLoggingForClient(client.getValue());
244 public void stopLoggingForClient(ActorRef client) {
245 logGenerator.stopLoggingForClient(client);
248 public void printState() {
249 for (ActorRef ref : actorRefs.values()) {
250 ref.tell(new PrintState(), null);
254 public void printNodes() {
255 for (ActorRef ref : actorRefs.values()) {
256 ref.tell(new PrintRole(), null);
260 public ActorRef getLeader() {
265 private static Map<String, String> withoutPeer(String peerId) {
266 Map<String, String> without = new ConcurrentHashMap<>(allPeers);
267 without.remove(peerId);