From b725909c696c8d40006a6297dc54a467fddcf6b3 Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Thu, 17 Jul 2014 16:43:40 -0700 Subject: [PATCH] Test driver and changes related to it Change-Id: I6f638dd4f4ce56be2ad31f056394568adeba165c Signed-off-by: Kamal Rameshan Signed-off-by: Moiz Raja --- .../cluster/example/ExampleActor.java | 8 + .../cluster/example/LogGenerator.java | 67 +++++ .../cluster/example/TestDriver.java | 246 ++++++++++++++++++ .../cluster/example/messages/PrintRole.java | 7 + .../cluster/example/messages/PrintState.java | 7 + .../controller/cluster/raft/RaftActor.java | 25 +- .../cluster/raft/RaftActorContext.java | 13 + .../cluster/raft/RaftActorContextImpl.java | 10 + .../cluster/raft/behaviors/Candidate.java | 1 + .../cluster/raft/behaviors/Leader.java | 3 +- .../raft/client/messages/AddRaftPeer.java | 23 ++ .../raft/client/messages/RemoveRaftPeer.java | 16 ++ .../cluster/raft/MockRaftActorContext.java | 8 + 13 files changed, 430 insertions(+), 4 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 90cc247709..914af5d9eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -13,6 +13,8 @@ import akka.actor.Props; import akka.japi.Creator; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.raft.RaftActor; import java.util.HashMap; @@ -50,6 +52,12 @@ public class ExampleActor extends RaftActor { } else { getLeader().forward(message, getContext()); } + + } else if (message instanceof PrintState) { + LOG.debug("State of the node:"+getId() + " is="+state.size()); + + } else if (message instanceof PrintRole) { + LOG.debug(getId() + " = " + getRaftState()); } super.onReceiveCommand(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java new file mode 100644 index 0000000000..fbe1447c72 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java @@ -0,0 +1,67 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.example.messages.KeyValue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Created by kramesha on 7/16/14. + */ +public class LogGenerator { + private Map clientToLoggingThread = new HashMap(); + + public void startLoggingForClient(ActorRef client) { + LoggingThread lt = new LoggingThread(client); + clientToLoggingThread.put(client, lt); + Thread t = new Thread(lt); + t.start(); + } + + public void stopLoggingForClient(ActorRef client) { + clientToLoggingThread.get(client).stopLogging(); + clientToLoggingThread.remove(client); + } + + public class LoggingThread implements Runnable { + + private ActorRef clientActor; + private volatile boolean stopLogging = false; + + public LoggingThread(ActorRef clientActor) { + this.clientActor = clientActor; + } + + public void run() { + Random r = new Random(); + while (true) { + if (stopLogging) { + System.out.println("Logging stopped for client:" + clientActor.path()); + break; + } + String key = clientActor.path().name(); + int random = r.nextInt(100); + clientActor.tell(new KeyValue(key+"-key-" + random, "value-" + random), null); + try { + Thread.sleep((random%10) * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void stopLogging() { + stopLogging = true; + } + + public void startLogging() { + stopLogging = false; + } + + + } + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java new file mode 100644 index 0000000000..c8a7835334 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -0,0 +1,246 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; +import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + + +public class TestDriver { + + private static final ActorSystem actorSystem = ActorSystem.create(); + private static Map allPeers = new HashMap<>(); + private static Map clientActorRefs = new HashMap(); + private static Map actorRefs = new HashMap(); + private static LogGenerator logGenerator = new LogGenerator();; + + /** + * Create nodes, add clients and start logging. + * Commands + * bye + * createNodes:{num} + * addNodes:{num} + * stopNode:{nodeName} + * addClients:{num} + * addClientsToNode:{nodeName, num} + * startLogging + * stopLogging + * startLoggingForClient:{nodeName} + * stopLoggingForClient:{nodeName} + * printNodes + * printState + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + TestDriver td = new TestDriver(); + + System.out.println("Enter command (type bye to exit):"); + + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + while(true) { + String command = br.readLine(); + if (command.startsWith("bye")) { + System.exit(0); + + } else if (command.startsWith("createNodes")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.createNodes(n); + + } else if (command.startsWith("addNodes")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.addNodes(n); + + } else if (command.startsWith("addClients")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.addClients(n); + + } else if (command.startsWith("addClientsToNode")) { + String[] arr = command.split(":"); + String nodeName = arr[1]; + int n = Integer.parseInt(arr[1]); + td.addClientsToNode(nodeName, n); + + } else if (command.startsWith("stopNode")) { + String[] arr = command.split(":"); + td.stopNode(arr[1]); + + } else if (command.startsWith("startLogging")) { + td.startAllLogging(); + + } else if (command.startsWith("startLoggingForClient")) { + String[] arr = command.split(":"); + td.startLoggingForClient(clientActorRefs.get(arr[1])); + + } else if (command.startsWith("stopLogging")) { + td.stopAllLogging(); + + } else if (command.startsWith("stopLoggingForClient")) { + String[] arr = command.split(":"); + td.stopLoggingForClient(clientActorRefs.get(arr[1])); + + } else if (command.startsWith("printState")) { + td.printState(); + } else if (command.startsWith("printNodes")) { + td.printNodes(); + } + + } + } + + public void createNodes(int num) { + for (int i=0; i < num; i++) { + int rand = getUnusedRandom(num); + allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + } + + for (String s : allPeers.keySet()) { + ActorRef exampleActor = actorSystem.actorOf( + ExampleActor.props(s, withoutPeer(s)), s); + actorRefs.put(s, exampleActor); + System.out.println("Created node:"+s); + + } + } + + // add new nodes , pass in the count + public void addNodes(int num) { + Map newPeers = new HashMap<>(); + for (int i=0; i < num; i++) { + int rand = getUnusedRandom(num); + newPeers.put("example-"+rand, "akka://default/user/example-"+rand); + allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + + } + Map newActorRefs = new HashMap(num); + for (Map.Entry entry : newPeers.entrySet()) { + ActorRef exampleActor = actorSystem.actorOf( + ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey()); + newActorRefs.put(entry.getKey(), exampleActor); + + //now also add these new nodes as peers from the previous nodes + for (ActorRef actor : actorRefs.values()) { + actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null); + } + + System.out.println("Added node:" + entry); + } + + actorRefs.putAll(newActorRefs); + } + + + // add num clients to all nodes in the system + public void addClients(int num) { + for(Map.Entry actorRefEntry : actorRefs.entrySet()) { + for (int i=0; i < num; i++) { + String clientName = "client-" + i + "-" + actorRefEntry.getKey(); + ActorRef clientActor = actorSystem.actorOf( + ClientActor.props(actorRefEntry.getValue()), clientName); + clientActorRefs.put(clientName, clientActor); + System.out.println("Created client-node:" + clientName); + } + } + } + + // add num clients to a node + public void addClientsToNode(String actorName, int num) { + ActorRef actorRef = actorRefs.get(actorName); + for (int i=0; i < num; i++) { + String clientName = "client-" + i + "-" + actorRef; + clientActorRefs.put(clientName, + actorSystem.actorOf(ClientActor.props(actorRef), clientName)); + System.out.println("Added client-node:" + clientName); + } + } + + public void stopNode(String actorName) { + ActorRef actorRef = actorRefs.get(actorName); + String clientName = "client-"+actorName; + if(clientActorRefs.containsKey(clientName)) { + actorSystem.stop(clientActorRefs.get(clientName)); + clientActorRefs.remove(clientName); + } + actorSystem.stop(actorRef); + actorRefs.remove(actorName); + + for (ActorRef actor : actorRefs.values()) { + actor.tell(new RemoveRaftPeer(actorName), null); + } + + allPeers.remove(actorName); + + } + + public void startAllLogging() { + if(!clientActorRefs.isEmpty()) { + for(Map.Entry client : clientActorRefs.entrySet()) { + logGenerator.startLoggingForClient(client.getValue()); + System.out.println("Started logging for client:"+client.getKey()); + } + } else { + System.out.println("There are no clients for any nodes. First create clients using commands- addClients: or addClientsToNode::"); + } + + } + + public void startLoggingForClient(ActorRef client) { + logGenerator.startLoggingForClient(client); + } + + public void stopAllLogging() { + for(Map.Entry client : clientActorRefs.entrySet()) { + logGenerator.stopLoggingForClient(client.getValue()); + } + } + + public void stopLoggingForClient(ActorRef client) { + logGenerator.stopLoggingForClient(client); + } + + public void printState() { + for (ActorRef ref : actorRefs.values()) { + ref.tell(new PrintState(), null); + } + } + + public void printNodes() { + for (ActorRef ref : actorRefs.values()) { + ref.tell(new PrintRole(), null); + } + } + + public ActorRef getLeader() { + return null; + } + + private int getUnusedRandom(int num) { + int rand = -1; + do { + rand = (new Random()).nextInt(num * num); + } while (allPeers.keySet().contains("example-"+rand)); + + return rand; + } + + private static Map withoutPeer(String peerId) { + Map without = new ConcurrentHashMap<>(allPeers); + without.remove(peerId); + + return without; + } +} + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java new file mode 100644 index 0000000000..c9d4bfa72c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java @@ -0,0 +1,7 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class PrintRole { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java new file mode 100644 index 0000000000..dbf863df9a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java @@ -0,0 +1,7 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class PrintState { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index f38ef18973..dd9572c9a7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -23,9 +23,11 @@ import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; @@ -126,8 +128,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public void onReceiveCommand(Object message) { - if (message instanceof ApplyState) { - + if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; LOG.debug("Applying state for log index {}", @@ -151,6 +152,18 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { // TODO: Handle failure in saving the snapshot + } else if (message instanceof FindLeader){ + getSender().tell(new FindLeaderReply( + context.getPeerAddress(currentBehavior.getLeaderId())), + getSelf()); + + } else if (message instanceof AddRaftPeer){ + AddRaftPeer arp = (AddRaftPeer)message; + context.addToPeers(arp.getName(), arp.getAddress()); + + } else if (message instanceof RemoveRaftPeer){ + RemoveRaftPeer rrp = (RemoveRaftPeer)message; + context.removePeer(rrp.getName()); } else { RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -200,7 +213,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @return A reference to the leader if known, null otherwise */ - protected ActorSelection getLeader() { + protected ActorSelection getLeader(){ String leaderId = currentBehavior.getLeaderId(); if (leaderId == null) { return null; @@ -211,6 +224,12 @@ public abstract class RaftActor extends UntypedPersistentActor { return context.actorSelection(peerAddress); } + protected RaftState getRaftState() { + return currentBehavior.state(); + } + + + /** * The applyState method will be called by the RaftActor when some data * needs to be applied to the actor's state diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index cd5865b02e..7150ec0e6e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -113,4 +113,17 @@ public interface RaftActorContext { * @return */ String getPeerAddress(String peerId); + + /** + * Add to actor peers + * @param name + * @param address + */ + void addToPeers(String name, String address); + + /** + * + * @param name + */ + public void removePeer(String name); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 03534d61a0..a0f13280c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -108,4 +108,14 @@ public class RaftActorContextImpl implements RaftActorContext{ @Override public String getPeerAddress(String peerId) { return peerAddresses.get(peerId); } + + @Override public void addToPeers(String name, String address) { + LOG.debug("Kamal--> addToPeer for:"+name); + peerAddresses.put(name, address); + } + + @Override public void removePeer(String name) { + LOG.debug("Kamal--> removePeer for:"+name); + peerAddresses.remove(name); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 0d035dbce7..8d84590426 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -58,6 +58,7 @@ public class Candidate extends AbstractRaftActorBehavior { context.actorSelection(peerPath)); } + context.getLogger().debug("Election:Candidate has following peers:"+peerToActor.keySet()); if(peerPaths.size() > 0) { // Votes are required from a majority of the peers including self. // The votesRequired field therefore stores a calculated value diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 90edf7da9a..857c87f0ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -93,9 +93,10 @@ public class Leader extends AbstractRaftActorBehavior { context.actorSelection(context.getPeerAddress(followerId))); followerToLog.put(followerId, followerLogInformation); - } + context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet()); + if (followerToActor.size() > 0) { minReplicationCount = (followerToActor.size() + 1) / 2 + 1; } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java new file mode 100644 index 0000000000..d1f4c43c86 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java @@ -0,0 +1,23 @@ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class AddRaftPeer { + + private String name; + private String address; + + public AddRaftPeer(String name, String address) { + this.name = name; + this.address = address; + } + + public String getName() { + return name; + } + + public String getAddress() { + return address; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java new file mode 100644 index 0000000000..4b766e04b7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java @@ -0,0 +1,16 @@ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class RemoveRaftPeer { + private String name; + + public RemoveRaftPeer(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index b9c1278b8e..e02994bb06 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -118,6 +118,14 @@ public class MockRaftActorContext implements RaftActorContext { return peerAddresses.get(peerId); } + @Override public void addToPeers(String name, String address) { + peerAddresses.put(name, address); + } + + @Override public void removePeer(String name) { + peerAddresses.remove(name); + } + public void setPeerAddresses(Map peerAddresses) { this.peerAddresses = peerAddresses; } -- 2.36.6