From: Kamal Rameshan Date: Tue, 18 Nov 2014 23:26:51 +0000 (-0800) Subject: Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state... X-Git-Tag: release/lithium~820^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a469dbcec569cc972df0cd57cf725a2173d2604a Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state change A notifier actor is spawned from the RaftActor inherited implementation, in our case Shard or ExampleActor . Its injected into the RaftActor.RaftActor notifies the notifier with the role changes. The Notification message is local to Shard or Example. The idea is for any implementation to create a notifier, inject it and issue a notification from that notifier on a role change. A sample example notifier, listener and notification is provided to show how a listener can register with the notifier and get notifications form the notifier. Notifier and Notifications are assembled in commons, to be shared along with other apps who might need similar logic. It can be override by specific implementations. Has been tested with the TestDriver, with separate actorsystems for listner and notifier Tests have been added. Change-Id: I23f16d4e76bb7dae640c544df282293274d9a1cb Signed-off-by: Kamal Rameshan --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6dfa4afd6b..6c65021d86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftState; @@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa */ public class ExampleActor extends RaftActor { - private final Map state = new HashMap<>(); + private final Map state = new HashMap(); private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; + private Optional roleChangeNotifier; - public ExampleActor(final String id, final Map peerAddresses, - final Optional configParams) { + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { super(id, peerAddresses, configParams); this.dataPersistenceProvider = new PersistentDataProvider(); + roleChangeNotifier = createRoleChangeNotifier(id); } public static Props props(final String id, final Map peerAddresses, final Optional configParams){ return Props.create(new Creator(){ - private static final long serialVersionUID = 1L; @Override public ExampleActor create() throws Exception { return new ExampleActor(id, peerAddresses, configParams); @@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor { }); } - @Override public void onReceiveCommand(final Object message) throws Exception{ + @Override public void onReceiveCommand(Object message) throws Exception{ if(message instanceof KeyValue){ if(isLeader()) { String persistId = Long.toString(persistIdentifier++); @@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor { String followers = ""; if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); - LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet(), followers); } else { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet()); } @@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor { } } + protected String getReplicatedLogState() { + return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); + } + + public Optional createRoleChangeNotifier(String actorId) { + ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); + return Optional.of(exampleRoleChangeNotifier); + } + + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { if(data instanceof KeyValue){ @@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor { getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(final ByteString snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); try { - state.putAll((Map) toObject(snapshot)); + state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { LOG.error(e, "Exception in applying snapshot"); } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((Map) state).size()); + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); } } - private ByteString fromObject(final Object snapshot) throws Exception { + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try { @@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; @@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor { return dataPersistenceProvider; } - @Override public void onReceiveRecover(final Object message)throws Exception { + @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } @@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(final int maxBatchSize) { + protected void startLogRecoveryBatch(int maxBatchSize) { } @Override - protected void appendRecoveredLogEntry(final Payload data) { + protected void appendRecoveredLogEntry(Payload data) { } @Override @@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor { } @Override - protected void applyRecoverySnapshot(final ByteString snapshot) { + protected void applyRecoverySnapshot(ByteString snapshot) { } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java new file mode 100644 index 0000000000..c0ee095367 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java @@ -0,0 +1,146 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.japi.Creator; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.example.messages.RegisterListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; + +/** + * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier + *

+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with. + *

+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply + *

+ * If all the notifiers have been regsitered with, then it cancels the scheduler. + * It starts the scheduler again when it receives a new registration + * + */ +public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{ + // the akka url should be set to the notifiers actor-system and domain. + private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/"; + + private Map notifierRegistrationStatus = new HashMap<>(); + private Cancellable registrationSchedule = null; + private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS); + private final String memberName; + private static final String[] shardsToMonitor = new String[] {"example"}; + + public ExampleRoleChangeListener(String memberName) { + super(); + scheduleRegistrationListener(schedulerDuration); + this.memberName = memberName; + populateRegistry(memberName); + } + + public static Props getProps(final String memberName) { + return Props.create(new Creator() { + @Override + public Actor create() throws Exception { + return new ExampleRoleChangeListener(memberName); + } + }); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof RegisterListener) { + // called by the scheduler at intervals to register any unregistered notifiers + sendRegistrationRequests(); + + } else if (message instanceof RegisterRoleChangeListenerReply) { + // called by the Notifier + handleRegisterRoleChangeListenerReply(getSender().path().toString()); + + } else if (message instanceof RoleChangeNotification) { + // called by the Notifier + RoleChangeNotification notification = (RoleChangeNotification) message; + + LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}", + notification.getMemberId(), notification.getOldRole(), notification.getNewRole()); + + // the apps dependent on such notifications can be called here + //TODO: add implementation here + + } + } + + private void scheduleRegistrationListener(FiniteDuration interval) { + LOG.debug("--->scheduleRegistrationListener called."); + registrationSchedule = getContext().system().scheduler().schedule( + interval, interval, getSelf(), new RegisterListener(), + getContext().system().dispatcher(), getSelf()); + + } + + private void populateRegistry(String memberName) { + + for (String shard: shardsToMonitor) { + String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName) + .append("/").append(memberName).append("-notifier").toString(); + + if (!notifierRegistrationStatus.containsKey(notifier)) { + notifierRegistrationStatus.put(notifier, false); + } + } + + if (!registrationSchedule.isCancelled()) { + scheduleRegistrationListener(schedulerDuration); + } + } + + private void sendRegistrationRequests() { + for (Map.Entry entry : notifierRegistrationStatus.entrySet()) { + if (!entry.getValue()) { + try { + LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey()); + ActorRef notifier = Await.result( + getContext().actorSelection(entry.getKey()).resolveOne(duration), duration); + + notifier.tell(new RegisterRoleChangeListener(), getSelf()); + + } catch (Exception e) { + LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey()); + } + } + } + } + + private void handleRegisterRoleChangeListenerReply(String senderId) { + if (notifierRegistrationStatus.containsKey(senderId)) { + notifierRegistrationStatus.put(senderId, true); + + //cancel the schedule when listener is registered with all notifiers + if (!registrationSchedule.isCancelled()) { + boolean cancelScheduler = true; + for (Boolean value : notifierRegistrationStatus.values()) { + cancelScheduler = cancelScheduler & value; + } + if (cancelScheduler) { + registrationSchedule.cancel(); + } + } + } else { + LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}", + senderId); + } + } + + + @Override + public void close() throws Exception { + registrationSchedule.cancel(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index de6169791e..cd2e4a506c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -3,15 +3,17 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Optional; -import org.opendaylight.controller.cluster.example.messages.PrintRole; -import org.opendaylight.controller.cluster.example.messages.PrintState; -import org.opendaylight.controller.cluster.raft.ConfigParams; - +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; /** * This is a test driver for testing akka-raft implementation @@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class TestDriver { - private static final ActorSystem actorSystem = ActorSystem.create(); + private static Map allPeers = new HashMap<>(); private static Map clientActorRefs = new HashMap(); private static Map actorRefs = new HashMap(); @@ -29,6 +31,9 @@ public class TestDriver { private int nameCounter = 0; private static ConfigParams configParams = new ExampleConfigParamsImpl(); + private static ActorSystem actorSystem; + private static ActorSystem listenerActorSystem; + /** * Create nodes, add clients and start logging. * Commands @@ -53,6 +58,13 @@ public class TestDriver { * @throws Exception */ public static void main(String[] args) throws Exception { + + actorSystem = ActorSystem.create("raft-test", ConfigFactory + .load().getConfig("raft-test")); + + listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory + .load().getConfig("raft-test-listener")); + TestDriver td = new TestDriver(); System.out.println("Enter command (type bye to exit):"); @@ -113,6 +125,16 @@ public class TestDriver { } } + // create the listener using a separate actor system for each example actor + private void createClusterRoleChangeListener(List memberIds) { + System.out.println("memberIds="+memberIds); + for (String memberId : memberIds) { + ActorRef listenerActor = listenerActorSystem.actorOf( + ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener"); + System.out.println("Role Change Listener created:" + listenerActor.path().toString()); + } + } + public static ActorRef createExampleActor(String name) { return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name), Optional.of(configParams)), name); @@ -121,7 +143,7 @@ public class TestDriver { public void createNodes(int num) { for (int i=0; i < num; i++) { nameCounter = nameCounter + 1; - allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); + allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter); } for (String s : allPeers.keySet()) { @@ -130,6 +152,8 @@ public class TestDriver { System.out.println("Created node:"+s); } + + createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet())); } // add num clients to all nodes in the system diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java new file mode 100644 index 0000000000..4507f43b8f --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java @@ -0,0 +1,9 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers + * + * This message is sent by the scheduler + */ +public class RegisterListener { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java new file mode 100644 index 0000000000..8adc0dac13 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java @@ -0,0 +1,18 @@ +package org.opendaylight.controller.cluster.example.messages; + +import java.util.List; + +/** + * Created by kramesha on 11/18/14. + */ +public class SetNotifiers { + private List notifierList; + + public SetNotifiers(List notifierList) { + this.notifierList = notifierList; + } + + public List getNotifierList() { + return notifierList; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 042b9fb569..d647475e4d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import java.io.Serializable; +import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import java.io.Serializable; -import java.util.Map; - /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper @@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); onRecoveryComplete(); + + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } } } @@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } @Override public void handleCommand(Object message) { @@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); - if(oldBehavior != currentBehavior){ - onStateChanged(); - } - - onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); + handleBehaviorChange(oldBehavior, currentBehavior); } } - public java.util.Set getPeers() { - - return context.getPeerAddresses().keySet(); - } + private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + if (oldBehavior != currentBehavior){ + onStateChanged(); + } + if (oldBehavior != null) { + // it can happen that the state has not changed but the leader has changed. + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); - protected String getReplicatedLogState() { - return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() - + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() - + ", im-mem journal size=" + context.getReplicatedLog().size(); + if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) { + // we do not want to notify when the behavior/role is set for the first time (i.e follower) + getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(), + currentBehavior.state().name()), getSelf()); + } + } } - /** * When a derived RaftActor needs to persist something it must call * persistData. @@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract DataPersistenceProvider persistence(); + /** + * Notifier Actor for this RaftActor to notify when a role change happens + * @return ActorRef - ActorRef of the notifier or Optional.absent if none. + */ + protected abstract Optional getRoleChangeNotifier(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { @@ -843,4 +852,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected RaftActorBehavior getCurrentBehavior() { return currentBehavior; } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf index 9e42a13c6a..b2132b88b0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf @@ -13,8 +13,67 @@ akka { serialization-bindings { "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java + "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java "com.google.protobuf.Message" = proto "com.google.protobuf.GeneratedMessage" = proto } } } + +raft-test { + akka { + + loglevel = "DEBUG" + + actor { + # enable to test serialization only. + # serialize-messages = on + + provider = "akka.remote.RemoteActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java + "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java + "com.google.protobuf.Message" = proto + "com.google.protobuf.GeneratedMessage" = proto + } + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2550 + } + } + } +} + +raft-test-listener { + + akka { + loglevel = "DEBUG" + + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2554 + } + } + + member-id = "member-1" + } +} + + + diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9eb2fb757b..c833a86e9b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -21,11 +21,25 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -36,27 +50,13 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; @@ -83,6 +83,9 @@ public class RaftActorTest extends AbstractActorTest { private final DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + private ActorRef roleChangeNotifier; public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -90,25 +93,27 @@ public class RaftActorTest extends AbstractActorTest { private final String id; private final Optional config; private final DataPersistenceProvider dataPersistenceProvider; + private final ActorRef roleChangeNotifier; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider) { + Optional config, DataPersistenceProvider dataPersistenceProvider, + ActorRef roleChangeNotifier) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; this.dataPersistenceProvider = dataPersistenceProvider; + this.roleChangeNotifier = roleChangeNotifier; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); + MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, + dataPersistenceProvider); + mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; + return mockRaftActor; } } - private final CountDownLatch recoveryComplete = new CountDownLatch(1); - - private final List state; - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); @@ -134,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest { public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); } public static Props props(final String id, final Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); + } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("applyState called"); } - - - @Override protected void startLogRecoveryBatch(int maxBatchSize) { } @@ -201,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest { return this.dataPersistenceProvider; } + @Override + protected Optional getRoleChangeNotifier() { + return Optional.fromNullable(roleChangeNotifier); + } + @Override public String persistenceId() { return this.getId(); } @@ -862,6 +873,40 @@ public class RaftActorTest extends AbstractActorTest { }; } + @Test + public void testRaftRoleChangeNotifier() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + String id = "testRaftRoleChangeNotifier"; + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id, + Collections.emptyMap(), Optional.of(config), notifierActor), id); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext())); + + // sleeping for a minimum of 2 seconds, if it spans more its fine. + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + assertEquals(2, matches.size()); + + // check if the notifier got a role change from Follower to Candidate + RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = (RoleChanged) matches.get(1); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java new file mode 100644 index 0000000000..1faa341d45 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier + * + * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable + */ +public class RegisterRoleChangeListener implements Serializable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java new file mode 100644 index 0000000000..f1d13e344f --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Reply message sent from a RoleChangeNotifier to the Role Change Listener + * + * Can be sent to a separate actor system and hence should be made serializable. + */ +public class RegisterRoleChangeListenerReply implements Serializable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java new file mode 100644 index 0000000000..de2733fc86 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Notification message representing a Role change of a cluster member + * + * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation + * + * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable + */ +public class RoleChangeNotification implements Serializable { + private String memberId; + private String oldRole; + private String newRole; + + public RoleChangeNotification(String memberId, String oldRole, String newRole) { + this.memberId = memberId; + this.oldRole = oldRole; + this.newRole = newRole; + } + + public String getMemberId() { + return memberId; + } + + public String getOldRole() { + return oldRole; + } + + public String getNewRole() { + return newRole; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java new file mode 100644 index 0000000000..a9aa56174d --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import akka.actor.Actor; +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import akka.serialization.Serialization; +import com.google.common.collect.Maps; +import java.util.Map; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; + +/** + * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying + * the listeners (within the same node), which are registered with it. + *

+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor. + */ +public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable { + + private String memberId; + private Map registeredListeners = Maps.newHashMap(); + private RoleChangeNotification latestRoleChangeNotification = null; + + public RoleChangeNotifier(String memberId) { + this.memberId = memberId; + } + + public static Props getProps(final String memberId) { + return Props.create(new Creator() { + @Override + public Actor create() throws Exception { + return new RoleChangeNotifier(memberId); + } + }); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + LOG.info("RoleChangeNotifier:{} created and ready for shard:{}", + Serialization.serializedActorPath(getSelf()), memberId); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof RegisterRoleChangeListener) { + // register listeners for this shard + + ActorRef curRef = registeredListeners.get(getSender().path()); + if (curRef != null) { + // ActorPaths would pass equal even if the unique id of the actors are different + // if a listener actor is re-registering after reincarnation, then removing the existing + // entry so the actor path with correct unique id is registered. + registeredListeners.remove(getSender().path()); + } + registeredListeners.put(getSender().path(), getSender()); + + LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId, + getSender().path().toString()); + + getSender().tell(new RegisterRoleChangeListenerReply(), getSelf()); + + if (latestRoleChangeNotification != null) { + getSender().tell(latestRoleChangeNotification, getSelf()); + } + + + } else if (message instanceof RoleChanged) { + // this message is sent by RaftActor. Notify registered listeners when this message is received. + RoleChanged roleChanged = (RoleChanged) message; + + LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId, + roleChanged.getOldRole(), roleChanged.getNewRole()); + + latestRoleChangeNotification = + new RoleChangeNotification(roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + for (ActorRef listener: registeredListeners.values()) { + listener.tell(latestRoleChangeNotification, getSelf()); + } + } + } + + @Override + public void close() throws Exception { + registeredListeners.clear(); + } +} + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java new file mode 100644 index 0000000000..f315bfdf7a --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java @@ -0,0 +1,31 @@ +package org.opendaylight.controller.cluster.notifications; + +/** + * Role Change message initiated internally from the Raft Actor when a the behavior/role changes. + * + * Since its internal , need not be serialized + * + */ +public class RoleChanged { + private String memberId; + private String oldRole; + private String newRole; + + public RoleChanged(String memberId, String oldRole, String newRole) { + this.memberId = memberId; + this.oldRole = oldRole; + this.newRole = newRole; + } + + public String getMemberId() { + return memberId; + } + + public String getOldRole() { + return oldRole; + } + + public String getNewRole() { + return newRole; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index d53cb48e50..7073ea758b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -131,6 +132,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -171,6 +174,9 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( @@ -196,6 +202,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -259,6 +271,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java new file mode 100644 index 0000000000..4e61260550 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -0,0 +1,79 @@ +package org.opendaylight.controller.cluster.datastore; + + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class RoleChangeNotifierTest extends AbstractActorTest { + + @Test + public void testHandleRegisterRoleChangeListener() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "testHandleRegisterRoleChangeListener"; + ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(memberId), memberId); + + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + + RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply) + MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class); + assertNotNull(reply); + + RoleChangeNotification notification = (RoleChangeNotification) + MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class); + assertNull(notification); + }}; + + } + + @Test + public void testHandleRaftRoleChanged() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet"; + ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActor = getTestActor(); + + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(memberId), memberId); + + RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor(); + + notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); + + // no notification should be sent as listener has not yet registered + assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class)); + + // listener registers after role has been changed, ensure we sent the latest role change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + + RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply) + MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class); + assertNotNull(reply); + + RoleChangeNotification notification = (RoleChangeNotification) + MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class); + assertNotNull(notification); + assertEquals(RaftState.Candidate.name(), notification.getOldRole()); + assertEquals(RaftState.Leader.name(), notification.getNewRole()); + + }}; + + } +} + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java index f75aa5445b..4bd0ad818f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -8,10 +8,19 @@ package org.opendaylight.controller.cluster.datastore.utils; +import akka.actor.ActorRef; import akka.actor.UntypedActor; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * MessageCollectorActor collects messages as it receives them. It can send @@ -27,10 +36,55 @@ public class MessageCollectorActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ if("messages".equals(message)){ - getSender().tell(messages, getSelf()); + getSender().tell(new ArrayList(messages), getSelf()); } } else { messages.add(message); } } + + public static List getAllMessages(ActorRef actor) throws Exception { + FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + Timeout operationTimeout = new Timeout(operationDuration); + Future future = Patterns.ask(actor, "messages", operationTimeout); + + try { + return (List) Await.result(future, operationDuration); + } catch (Exception e) { + throw e; + } + } + + /** + * Get the first message that matches the specified class + * @param actor + * @param clazz + * @return + */ + public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + return message; + } + } + + return null; + } + + public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + List output = Lists.newArrayList(); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + output.add(message); + } + } + + return output; + } + }