From: Moiz Raja Date: Mon, 1 Dec 2014 21:59:11 +0000 (+0000) Subject: Merge "bug 2266 : added more types of schema nodes to increase code coverage" X-Git-Tag: release/lithium~818 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d564bfe7b9b24474cc0426a859cfae8dbad8b571;hp=4d84dd001d2f30ef46eac16ec73d1951b4dd3044 Merge "bug 2266 : added more types of schema nodes to increase code coverage" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6dfa4afd6b..6c65021d86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftState; @@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa */ public class ExampleActor extends RaftActor { - private final Map state = new HashMap<>(); + private final Map state = new HashMap(); private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; + private Optional roleChangeNotifier; - public ExampleActor(final String id, final Map peerAddresses, - final Optional configParams) { + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { super(id, peerAddresses, configParams); this.dataPersistenceProvider = new PersistentDataProvider(); + roleChangeNotifier = createRoleChangeNotifier(id); } public static Props props(final String id, final Map peerAddresses, final Optional configParams){ return Props.create(new Creator(){ - private static final long serialVersionUID = 1L; @Override public ExampleActor create() throws Exception { return new ExampleActor(id, peerAddresses, configParams); @@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor { }); } - @Override public void onReceiveCommand(final Object message) throws Exception{ + @Override public void onReceiveCommand(Object message) throws Exception{ if(message instanceof KeyValue){ if(isLeader()) { String persistId = Long.toString(persistIdentifier++); @@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor { String followers = ""; if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); - LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet(), followers); } else { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet()); } @@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor { } } + protected String getReplicatedLogState() { + return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); + } + + public Optional createRoleChangeNotifier(String actorId) { + ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); + return Optional.of(exampleRoleChangeNotifier); + } + + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { if(data instanceof KeyValue){ @@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor { getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(final ByteString snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); try { - state.putAll((Map) toObject(snapshot)); + state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { LOG.error(e, "Exception in applying snapshot"); } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((Map) state).size()); + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); } } - private ByteString fromObject(final Object snapshot) throws Exception { + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try { @@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; @@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor { return dataPersistenceProvider; } - @Override public void onReceiveRecover(final Object message)throws Exception { + @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } @@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(final int maxBatchSize) { + protected void startLogRecoveryBatch(int maxBatchSize) { } @Override - protected void appendRecoveredLogEntry(final Payload data) { + protected void appendRecoveredLogEntry(Payload data) { } @Override @@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor { } @Override - protected void applyRecoverySnapshot(final ByteString snapshot) { + protected void applyRecoverySnapshot(ByteString snapshot) { } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java new file mode 100644 index 0000000000..c0ee095367 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java @@ -0,0 +1,146 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.japi.Creator; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.example.messages.RegisterListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; + +/** + * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier + *

+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with. + *

+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply + *

+ * If all the notifiers have been regsitered with, then it cancels the scheduler. + * It starts the scheduler again when it receives a new registration + * + */ +public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{ + // the akka url should be set to the notifiers actor-system and domain. + private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/"; + + private Map notifierRegistrationStatus = new HashMap<>(); + private Cancellable registrationSchedule = null; + private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS); + private final String memberName; + private static final String[] shardsToMonitor = new String[] {"example"}; + + public ExampleRoleChangeListener(String memberName) { + super(); + scheduleRegistrationListener(schedulerDuration); + this.memberName = memberName; + populateRegistry(memberName); + } + + public static Props getProps(final String memberName) { + return Props.create(new Creator() { + @Override + public Actor create() throws Exception { + return new ExampleRoleChangeListener(memberName); + } + }); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof RegisterListener) { + // called by the scheduler at intervals to register any unregistered notifiers + sendRegistrationRequests(); + + } else if (message instanceof RegisterRoleChangeListenerReply) { + // called by the Notifier + handleRegisterRoleChangeListenerReply(getSender().path().toString()); + + } else if (message instanceof RoleChangeNotification) { + // called by the Notifier + RoleChangeNotification notification = (RoleChangeNotification) message; + + LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}", + notification.getMemberId(), notification.getOldRole(), notification.getNewRole()); + + // the apps dependent on such notifications can be called here + //TODO: add implementation here + + } + } + + private void scheduleRegistrationListener(FiniteDuration interval) { + LOG.debug("--->scheduleRegistrationListener called."); + registrationSchedule = getContext().system().scheduler().schedule( + interval, interval, getSelf(), new RegisterListener(), + getContext().system().dispatcher(), getSelf()); + + } + + private void populateRegistry(String memberName) { + + for (String shard: shardsToMonitor) { + String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName) + .append("/").append(memberName).append("-notifier").toString(); + + if (!notifierRegistrationStatus.containsKey(notifier)) { + notifierRegistrationStatus.put(notifier, false); + } + } + + if (!registrationSchedule.isCancelled()) { + scheduleRegistrationListener(schedulerDuration); + } + } + + private void sendRegistrationRequests() { + for (Map.Entry entry : notifierRegistrationStatus.entrySet()) { + if (!entry.getValue()) { + try { + LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey()); + ActorRef notifier = Await.result( + getContext().actorSelection(entry.getKey()).resolveOne(duration), duration); + + notifier.tell(new RegisterRoleChangeListener(), getSelf()); + + } catch (Exception e) { + LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey()); + } + } + } + } + + private void handleRegisterRoleChangeListenerReply(String senderId) { + if (notifierRegistrationStatus.containsKey(senderId)) { + notifierRegistrationStatus.put(senderId, true); + + //cancel the schedule when listener is registered with all notifiers + if (!registrationSchedule.isCancelled()) { + boolean cancelScheduler = true; + for (Boolean value : notifierRegistrationStatus.values()) { + cancelScheduler = cancelScheduler & value; + } + if (cancelScheduler) { + registrationSchedule.cancel(); + } + } + } else { + LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}", + senderId); + } + } + + + @Override + public void close() throws Exception { + registrationSchedule.cancel(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index de6169791e..cd2e4a506c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -3,15 +3,17 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Optional; -import org.opendaylight.controller.cluster.example.messages.PrintRole; -import org.opendaylight.controller.cluster.example.messages.PrintState; -import org.opendaylight.controller.cluster.raft.ConfigParams; - +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; /** * This is a test driver for testing akka-raft implementation @@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class TestDriver { - private static final ActorSystem actorSystem = ActorSystem.create(); + private static Map allPeers = new HashMap<>(); private static Map clientActorRefs = new HashMap(); private static Map actorRefs = new HashMap(); @@ -29,6 +31,9 @@ public class TestDriver { private int nameCounter = 0; private static ConfigParams configParams = new ExampleConfigParamsImpl(); + private static ActorSystem actorSystem; + private static ActorSystem listenerActorSystem; + /** * Create nodes, add clients and start logging. * Commands @@ -53,6 +58,13 @@ public class TestDriver { * @throws Exception */ public static void main(String[] args) throws Exception { + + actorSystem = ActorSystem.create("raft-test", ConfigFactory + .load().getConfig("raft-test")); + + listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory + .load().getConfig("raft-test-listener")); + TestDriver td = new TestDriver(); System.out.println("Enter command (type bye to exit):"); @@ -113,6 +125,16 @@ public class TestDriver { } } + // create the listener using a separate actor system for each example actor + private void createClusterRoleChangeListener(List memberIds) { + System.out.println("memberIds="+memberIds); + for (String memberId : memberIds) { + ActorRef listenerActor = listenerActorSystem.actorOf( + ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener"); + System.out.println("Role Change Listener created:" + listenerActor.path().toString()); + } + } + public static ActorRef createExampleActor(String name) { return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name), Optional.of(configParams)), name); @@ -121,7 +143,7 @@ public class TestDriver { public void createNodes(int num) { for (int i=0; i < num; i++) { nameCounter = nameCounter + 1; - allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); + allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter); } for (String s : allPeers.keySet()) { @@ -130,6 +152,8 @@ public class TestDriver { System.out.println("Created node:"+s); } + + createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet())); } // add num clients to all nodes in the system diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java new file mode 100644 index 0000000000..4507f43b8f --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java @@ -0,0 +1,9 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers + * + * This message is sent by the scheduler + */ +public class RegisterListener { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java new file mode 100644 index 0000000000..8adc0dac13 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java @@ -0,0 +1,18 @@ +package org.opendaylight.controller.cluster.example.messages; + +import java.util.List; + +/** + * Created by kramesha on 11/18/14. + */ +public class SetNotifiers { + private List notifierList; + + public SetNotifiers(List notifierList) { + this.notifierList = notifierList; + } + + public List getNotifierList() { + return notifierList; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 042b9fb569..d647475e4d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import java.io.Serializable; +import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import java.io.Serializable; -import java.util.Map; - /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper @@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); onRecoveryComplete(); + + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } } } @@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } @Override public void handleCommand(Object message) { @@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); - if(oldBehavior != currentBehavior){ - onStateChanged(); - } - - onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); + handleBehaviorChange(oldBehavior, currentBehavior); } } - public java.util.Set getPeers() { - - return context.getPeerAddresses().keySet(); - } + private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + if (oldBehavior != currentBehavior){ + onStateChanged(); + } + if (oldBehavior != null) { + // it can happen that the state has not changed but the leader has changed. + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); - protected String getReplicatedLogState() { - return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() - + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() - + ", im-mem journal size=" + context.getReplicatedLog().size(); + if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) { + // we do not want to notify when the behavior/role is set for the first time (i.e follower) + getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(), + currentBehavior.state().name()), getSelf()); + } + } } - /** * When a derived RaftActor needs to persist something it must call * persistData. @@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract DataPersistenceProvider persistence(); + /** + * Notifier Actor for this RaftActor to notify when a role change happens + * @return ActorRef - ActorRef of the notifier or Optional.absent if none. + */ + protected abstract Optional getRoleChangeNotifier(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { @@ -843,4 +852,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected RaftActorBehavior getCurrentBehavior() { return currentBehavior; } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d85ac8ef67..e5c5dc752d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { + + // The index of the first chunk that is sent when installing a snapshot + public static final int FIRST_CHUNK_INDEX = 1; + + // The index that the follower should respond with if it needs the install snapshot to be reset + public static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + protected final Map followerToLog = new HashMap<>(); protected final Map mapFollowerToSnapshot = new HashMap<>(); @@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { "sending snapshot chunk failed, Will retry, Chunk:{}", reply.getChunkIndex() ); + followerToSnapshot.markSendStatus(false); } @@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { " or Chunk Index in InstallSnapshotReply not matching {} != {}", followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + + if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + // Since the Follower did not find this index to be valid we should reset the follower snapshot + // so that Installing the snapshot can resume from the beginning + followerToSnapshot.reset(); + } } } @@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotTerm(), getNextSnapshotChunk(followerId,snapshot.get()), mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() + mapFollowerToSnapshot.get(followerId).getTotalChunks(), + Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) ).toSerializable(), actor() ); @@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private boolean replyStatus = false; private int chunkIndex; private int totalChunks; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); @@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Snapshot {} bytes, total chunks to send:{}", size, totalChunks); } + replyReceivedForOffset = -1; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; } public ByteString getSnapshotBytes() { @@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; + lastChunkHashCode = nextChunkHashCode; } else { // if the chunk sent was failure replyReceivedForOffset = offset; @@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("length={}, offset={},size={}", snapshotLength, start, size); } - return getSnapshotBytes().substring(start, start + size); + ByteString substring = getSnapshotBytes().substring(start, start + size); + nextChunkHashCode = substring.hashCode(); + return substring; + } + + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + public void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; + lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + } + public int getLastChunkHashCode() { + return lastChunkHashCode; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 7ada8b31c5..b1c73f6f41 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,7 +9,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import java.util.ArrayList; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import java.util.ArrayList; - /** * The behavior of a RaftActor in the Follower state *

@@ -36,7 +36,8 @@ import java.util.ArrayList; * */ public class Follower extends AbstractRaftActorBehavior { - private ByteString snapshotChunksCollected = ByteString.EMPTY; + + private SnapshotTracker snapshotTracker = null; public Follower(RaftActorContext context) { super(context); @@ -280,48 +281,54 @@ public class Follower extends AbstractRaftActorBehavior { ); } - try { - if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { - // this is the last chunk, create a snapshot object and apply - - snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); - if(LOG.isDebugEnabled()) { - LOG.debug("Last chunk received: snapshotChunksCollected.size:{}", - snapshotChunksCollected.size()); - } + if(snapshotTracker == null){ + snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); + } - Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), - new ArrayList(), - installSnapshot.getLastIncludedIndex(), - installSnapshot.getLastIncludedTerm(), - installSnapshot.getLastIncludedIndex(), - installSnapshot.getLastIncludedTerm()); + try { + if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), + installSnapshot.getLastChunkHashCode())){ + Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(), + new ArrayList(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm()); actor().tell(new ApplySnapshot(snapshot), actor()); - } else { - // we have more to go - snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + snapshotTracker = null; - if(LOG.isDebugEnabled()) { - LOG.debug("Chunk={},snapshotChunksCollected.size:{}", - installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); - } } sender.tell(new InstallSnapshotReply( - currentTerm(), context.getId(), installSnapshot.getChunkIndex(), - true), actor()); + currentTerm(), context.getId(), installSnapshot.getChunkIndex(), + true), actor()); + + } catch (SnapshotTracker.InvalidChunkException e) { + + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), + -1, false), actor()); + snapshotTracker = null; + + } catch (Exception e){ - } catch (Exception e) { LOG.error(e, "Exception in InstallSnapshot of follower:"); //send reply with success as false. The chunk will be sent again on failure sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), - installSnapshot.getChunkIndex(), false), actor()); + installSnapshot.getChunkIndex(), false), actor()); + } } @Override public void close() throws Exception { stopElection(); } + + @VisibleForTesting + ByteString getSnapshotChunksCollected(){ + return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY; + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java new file mode 100644 index 0000000000..26fbde0711 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.event.LoggingAdapter; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; + +/** + * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower + */ +public class SnapshotTracker { + private final LoggingAdapter LOG; + private final int totalChunks; + private ByteString collectedChunks = ByteString.EMPTY; + private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1; + private boolean sealed = false; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + + SnapshotTracker(LoggingAdapter LOG, int totalChunks){ + this.LOG = LOG; + this.totalChunks = totalChunks; + } + + /** + * Adds a chunk to the tracker + * + * @param chunkIndex + * @param chunk + * @return true when the lastChunk is received + * @throws InvalidChunkException + */ + boolean addChunk(int chunkIndex, ByteString chunk, Optional lastChunkHashCode) throws InvalidChunkException{ + if(sealed){ + throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received"); + } + + if(lastChunkIndex + 1 != chunkIndex){ + throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex); + } + + if(lastChunkHashCode.isPresent()){ + if(lastChunkHashCode.get() != this.lastChunkHashCode){ + throw new InvalidChunkException("The hash code of the recorded last chunk does not match " + + "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get()); + } + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Chunk={},collectedChunks.size:{}", + chunkIndex, collectedChunks.size()); + } + + sealed = (chunkIndex == totalChunks); + lastChunkIndex = chunkIndex; + collectedChunks = collectedChunks.concat(chunk); + this.lastChunkHashCode = chunk.hashCode(); + return sealed; + } + + byte[] getSnapshot(){ + if(!sealed) { + throw new IllegalStateException("lastChunk not received yet"); + } + + return collectedChunks.toByteArray(); + } + + ByteString getCollectedChunks(){ + return collectedChunks; + } + + public static class InvalidChunkException extends Exception { + InvalidChunkException(String message){ + super(message); + } + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 3c4e8117c7..6337f8f6dc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; @@ -22,9 +23,10 @@ public class InstallSnapshot extends AbstractRaftRPC { private final ByteString data; private final int chunkIndex; private final int totalChunks; + private final Optional lastChunkHashCode; public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, - long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional lastChunkHashCode) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; @@ -32,8 +34,15 @@ public class InstallSnapshot extends AbstractRaftRPC { this.data = data; this.chunkIndex = chunkIndex; this.totalChunks = totalChunks; + this.lastChunkHashCode = lastChunkHashCode; } + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { + this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.absent()); + } + + public String getLeaderId() { return leaderId; } @@ -58,25 +67,38 @@ public class InstallSnapshot extends AbstractRaftRPC { return totalChunks; } - public Object toSerializable(){ - return InstallSnapshotMessages.InstallSnapshot.newBuilder() - .setLeaderId(this.getLeaderId()) - .setChunkIndex(this.getChunkIndex()) - .setData(this.getData()) - .setLastIncludedIndex(this.getLastIncludedIndex()) - .setLastIncludedTerm(this.getLastIncludedTerm()) - .setTotalChunks(this.getTotalChunks()).build(); + public Optional getLastChunkHashCode() { + return lastChunkHashCode; + } + public Object toSerializable(){ + InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder() + .setLeaderId(this.getLeaderId()) + .setChunkIndex(this.getChunkIndex()) + .setData(this.getData()) + .setLastIncludedIndex(this.getLastIncludedIndex()) + .setLastIncludedTerm(this.getLastIncludedTerm()) + .setTotalChunks(this.getTotalChunks()); + + if(lastChunkHashCode.isPresent()){ + builder.setLastChunkHashCode(lastChunkHashCode.get()); + } + return builder.build(); } public static InstallSnapshot fromSerializable (Object o) { InstallSnapshotMessages.InstallSnapshot from = (InstallSnapshotMessages.InstallSnapshot) o; + Optional lastChunkHashCode = Optional.absent(); + if(from.hasLastChunkHashCode()){ + lastChunkHashCode = Optional.of(from.getLastChunkHashCode()); + } + InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(), from.getLeaderId(), from.getLastIncludedIndex(), from.getLastIncludedTerm(), from.getData(), - from.getChunkIndex(), from.getTotalChunks()); + from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode); return installSnapshot; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf index 9e42a13c6a..b2132b88b0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf @@ -13,8 +13,67 @@ akka { serialization-bindings { "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java + "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java "com.google.protobuf.Message" = proto "com.google.protobuf.GeneratedMessage" = proto } } } + +raft-test { + akka { + + loglevel = "DEBUG" + + actor { + # enable to test serialization only. + # serialize-messages = on + + provider = "akka.remote.RemoteActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java + "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java + "com.google.protobuf.Message" = proto + "com.google.protobuf.GeneratedMessage" = proto + } + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2550 + } + } + } +} + +raft-test-listener { + + akka { + loglevel = "DEBUG" + + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2554 + } + } + + member-id = "member-1" + } +} + + + diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9eb2fb757b..c833a86e9b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -21,11 +21,25 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -36,27 +50,13 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; @@ -83,6 +83,9 @@ public class RaftActorTest extends AbstractActorTest { private final DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + private ActorRef roleChangeNotifier; public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -90,25 +93,27 @@ public class RaftActorTest extends AbstractActorTest { private final String id; private final Optional config; private final DataPersistenceProvider dataPersistenceProvider; + private final ActorRef roleChangeNotifier; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider) { + Optional config, DataPersistenceProvider dataPersistenceProvider, + ActorRef roleChangeNotifier) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; this.dataPersistenceProvider = dataPersistenceProvider; + this.roleChangeNotifier = roleChangeNotifier; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); + MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, + dataPersistenceProvider); + mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; + return mockRaftActor; } } - private final CountDownLatch recoveryComplete = new CountDownLatch(1); - - private final List state; - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); @@ -134,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest { public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); } public static Props props(final String id, final Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); + } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("applyState called"); } - - - @Override protected void startLogRecoveryBatch(int maxBatchSize) { } @@ -201,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest { return this.dataPersistenceProvider; } + @Override + protected Optional getRoleChangeNotifier() { + return Optional.fromNullable(roleChangeNotifier); + } + @Override public String persistenceId() { return this.getId(); } @@ -862,6 +873,40 @@ public class RaftActorTest extends AbstractActorTest { }; } + @Test + public void testRaftRoleChangeNotifier() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + String id = "testRaftRoleChangeNotifier"; + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id, + Collections.emptyMap(), Optional.of(config), notifierActor), id); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext())); + + // sleeping for a minimum of 2 seconds, if it spans more its fine. + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + assertEquals(2, matches.size()); + + // check if the notifier got a role change from Follower to Candidate + RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = (RoleChanged) matches.get(1); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 83b9ad3ec7..0ee9693d32 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,9 +1,20 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -20,19 +31,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - public class FollowerTest extends AbstractRaftActorBehaviorTest { private final ActorRef followerActor = getSystem().actorOf(Props.create( @@ -452,18 +450,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int i = 1; + int chunkIndex = 1; do { chunkData = getNextChunk(bsSnapshot, offset); final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", i, 1, - chunkData, i, 3); + chunkData, chunkIndex, 3); follower.handleMessage(leaderActor, installSnapshot); offset = offset + 50; i++; + chunkIndex++; } while ((offset+50) < snapshotLength); - final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3); + final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3); follower.handleMessage(leaderActor, installSnapshot3); String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { @@ -490,6 +490,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } }.get(); + // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty + assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); + String applySnapshotMatch = ""; for (String reply: matches) { if (reply.startsWith("applySnapshot")) { @@ -517,6 +520,52 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { }}; } + @Test + public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + JavaTestKit javaTestKit = new JavaTestKit(getSystem()) { + { + + ActorRef leaderActor = getSystem().actorOf(Props.create( + MessageCollectorActor.class)); + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(getRef()); + + Follower follower = (Follower) createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + + final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3); + follower.handleMessage(leaderActor, installSnapshot); + + Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + + assertNotNull(messages); + assertTrue(messages instanceof List); + List listMessages = (List) messages; + + int installSnapshotReplyReceivedCount = 0; + for (Object message: listMessages) { + if (message instanceof InstallSnapshotReply) { + ++installSnapshotReplyReceivedCount; + } + } + + assertEquals(1, installSnapshotReplyReceivedCount); + InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0); + assertEquals(false, reply.isSuccess()); + assertEquals(-1, reply.getChunkIndex()); + assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); + + + }}; + } + public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { return MessageCollectorActor.getAllMessages(actor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 0fc0b4ccfd..895fe35bff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,11 +1,16 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -41,9 +46,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class LeaderTest extends AbstractRaftActorBehaviorTest { @@ -125,7 +127,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); @@ -570,6 +572,156 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } + @Test + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false)); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + o = MessageCollectorActor.getAllMessages(followerActor).get(1); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + + @Test + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + new JavaTestKit(getSystem()) { + { + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + + int hashCode = installSnapshot.getData().hashCode(); + + leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + o = MessageCollectorActor.getAllMessages(followerActor).get(1); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(2, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + @Test public void testFollowerToSnapshotLogic() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java new file mode 100644 index 0000000000..1b3a8f5fb5 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -0,0 +1,181 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import akka.event.LoggingAdapter; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnapshotTrackerTest { + + Map data; + ByteString byteString; + ByteString chunk1; + ByteString chunk2; + ByteString chunk3; + + @Before + public void setup(){ + data = new HashMap<>(); + data.put("key1", "value1"); + data.put("key2", "value2"); + data.put("key3", "value3"); + + byteString = toByteString(data); + chunk1 = getNextChunk(byteString, 0, 10); + chunk2 = getNextChunk(byteString, 10, 10); + chunk3 = getNextChunk(byteString, 20, byteString.size()); + } + + @Test + public void testAddChunk() throws SnapshotTracker.InvalidChunkException { + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + tracker1.addChunk(1, chunk1, Optional.absent()); + tracker1.addChunk(2, chunk2, Optional.absent()); + tracker1.addChunk(3, chunk3, Optional.absent()); + + // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker + SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker2.addChunk(1, chunk1, Optional.absent()); + tracker2.addChunk(2, chunk2, Optional.absent()); + + try { + tracker2.addChunk(3, chunk3, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + e.getMessage().startsWith("Invalid chunk"); + } + + // The first chunk's index must at least be FIRST_CHUNK_INDEX + SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + try { + tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + + } + + // Out of sequence chunk indexes won't work + SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + + try { + tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + + } + + // No exceptions will be thrown when invalid chunk is added with the right sequence + // If the lastChunkHashCode is missing + SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + // Look I can add the same chunk again + tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.absent()); + + // An exception will be thrown when an invalid chunk is addedd with the right sequence + // when the lastChunkHashCode is present + SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); + + try { + // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777 + tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777)); + Assert.fail(); + }catch(SnapshotTracker.InvalidChunkException e){ + + } + + } + + @Test + public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException { + + // Trying to get a snapshot before all chunks have been received will throw an exception + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + tracker1.addChunk(1, chunk1, Optional.absent()); + try { + tracker1.getSnapshot(); + Assert.fail(); + } catch(IllegalStateException e){ + + } + + SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3); + + tracker2.addChunk(1, chunk1, Optional.absent()); + tracker2.addChunk(2, chunk2, Optional.absent()); + tracker2.addChunk(3, chunk3, Optional.absent()); + + byte[] snapshot = tracker2.getSnapshot(); + + assertEquals(byteString, ByteString.copyFrom(snapshot)); + } + + @Test + public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException { + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + ByteString chunks = chunk1.concat(chunk2); + + tracker1.addChunk(1, chunk1, Optional.absent()); + tracker1.addChunk(2, chunk2, Optional.absent()); + + assertEquals(chunks, tracker1.getCollectedChunks()); + } + + public ByteString getNextChunk (ByteString bs, int offset, int size){ + int snapshotLength = bs.size(); + int start = offset; + if (size > snapshotLength) { + size = snapshotLength; + } else { + if ((start + size) > snapshotLength) { + size = snapshotLength - start; + } + } + return bs.substring(start, start + size); + } + + private ByteString toByteString(Map state) { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(state); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } catch (IOException e) { + org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + } + return null; + } + + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 9fbdd4587f..3469a956c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -13,15 +13,14 @@ import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class MessageCollectorActor extends UntypedActor { private List messages = new ArrayList<>(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java new file mode 100644 index 0000000000..1faa341d45 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier + * + * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable + */ +public class RegisterRoleChangeListener implements Serializable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java new file mode 100644 index 0000000000..f1d13e344f --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Reply message sent from a RoleChangeNotifier to the Role Change Listener + * + * Can be sent to a separate actor system and hence should be made serializable. + */ +public class RegisterRoleChangeListenerReply implements Serializable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java new file mode 100644 index 0000000000..de2733fc86 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * Notification message representing a Role change of a cluster member + * + * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation + * + * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable + */ +public class RoleChangeNotification implements Serializable { + private String memberId; + private String oldRole; + private String newRole; + + public RoleChangeNotification(String memberId, String oldRole, String newRole) { + this.memberId = memberId; + this.oldRole = oldRole; + this.newRole = newRole; + } + + public String getMemberId() { + return memberId; + } + + public String getOldRole() { + return oldRole; + } + + public String getNewRole() { + return newRole; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java new file mode 100644 index 0000000000..a9aa56174d --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.notifications; + +import akka.actor.Actor; +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import akka.serialization.Serialization; +import com.google.common.collect.Maps; +import java.util.Map; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; + +/** + * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying + * the listeners (within the same node), which are registered with it. + *

+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor. + */ +public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable { + + private String memberId; + private Map registeredListeners = Maps.newHashMap(); + private RoleChangeNotification latestRoleChangeNotification = null; + + public RoleChangeNotifier(String memberId) { + this.memberId = memberId; + } + + public static Props getProps(final String memberId) { + return Props.create(new Creator() { + @Override + public Actor create() throws Exception { + return new RoleChangeNotifier(memberId); + } + }); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + LOG.info("RoleChangeNotifier:{} created and ready for shard:{}", + Serialization.serializedActorPath(getSelf()), memberId); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof RegisterRoleChangeListener) { + // register listeners for this shard + + ActorRef curRef = registeredListeners.get(getSender().path()); + if (curRef != null) { + // ActorPaths would pass equal even if the unique id of the actors are different + // if a listener actor is re-registering after reincarnation, then removing the existing + // entry so the actor path with correct unique id is registered. + registeredListeners.remove(getSender().path()); + } + registeredListeners.put(getSender().path(), getSender()); + + LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId, + getSender().path().toString()); + + getSender().tell(new RegisterRoleChangeListenerReply(), getSelf()); + + if (latestRoleChangeNotification != null) { + getSender().tell(latestRoleChangeNotification, getSelf()); + } + + + } else if (message instanceof RoleChanged) { + // this message is sent by RaftActor. Notify registered listeners when this message is received. + RoleChanged roleChanged = (RoleChanged) message; + + LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId, + roleChanged.getOldRole(), roleChanged.getNewRole()); + + latestRoleChangeNotification = + new RoleChangeNotification(roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + for (ActorRef listener: registeredListeners.values()) { + listener.tell(latestRoleChangeNotification, getSelf()); + } + } + } + + @Override + public void close() throws Exception { + registeredListeners.clear(); + } +} + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java new file mode 100644 index 0000000000..f315bfdf7a --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java @@ -0,0 +1,31 @@ +package org.opendaylight.controller.cluster.notifications; + +/** + * Role Change message initiated internally from the Raft Actor when a the behavior/role changes. + * + * Since its internal , need not be serialized + * + */ +public class RoleChanged { + private String memberId; + private String oldRole; + private String newRole; + + public RoleChanged(String memberId, String oldRole, String newRole) { + this.memberId = memberId; + this.oldRole = oldRole; + this.newRole = newRole; + } + + public String getMemberId() { + return memberId; + } + + public String getOldRole() { + return oldRole; + } + + public String getNewRole() { + return newRole; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java index b93be3e009..de7f44e213 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java @@ -85,6 +85,16 @@ public final class InstallSnapshotMessages { * optional int32 totalChunks = 7; */ int getTotalChunks(); + + // optional int32 lastChunkHashCode = 8; + /** + * optional int32 lastChunkHashCode = 8; + */ + boolean hasLastChunkHashCode(); + /** + * optional int32 lastChunkHashCode = 8; + */ + int getLastChunkHashCode(); } /** * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} @@ -172,6 +182,11 @@ public final class InstallSnapshotMessages { totalChunks_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + lastChunkHashCode_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -351,6 +366,22 @@ public final class InstallSnapshotMessages { return totalChunks_; } + // optional int32 lastChunkHashCode = 8; + public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8; + private int lastChunkHashCode_; + /** + * optional int32 lastChunkHashCode = 8; + */ + public boolean hasLastChunkHashCode() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public int getLastChunkHashCode() { + return lastChunkHashCode_; + } + private void initFields() { term_ = 0L; leaderId_ = ""; @@ -359,6 +390,7 @@ public final class InstallSnapshotMessages { data_ = com.google.protobuf.ByteString.EMPTY; chunkIndex_ = 0; totalChunks_ = 0; + lastChunkHashCode_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -393,6 +425,9 @@ public final class InstallSnapshotMessages { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(7, totalChunks_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeInt32(8, lastChunkHashCode_); + } getUnknownFields().writeTo(output); } @@ -430,6 +465,10 @@ public final class InstallSnapshotMessages { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, totalChunks_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(8, lastChunkHashCode_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -560,6 +599,8 @@ public final class InstallSnapshotMessages { bitField0_ = (bitField0_ & ~0x00000020); totalChunks_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + lastChunkHashCode_ = 0; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -616,6 +657,10 @@ public final class InstallSnapshotMessages { to_bitField0_ |= 0x00000040; } result.totalChunks_ = totalChunks_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.lastChunkHashCode_ = lastChunkHashCode_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -655,6 +700,9 @@ public final class InstallSnapshotMessages { if (other.hasTotalChunks()) { setTotalChunks(other.getTotalChunks()); } + if (other.hasLastChunkHashCode()) { + setLastChunkHashCode(other.getLastChunkHashCode()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -957,6 +1005,39 @@ public final class InstallSnapshotMessages { return this; } + // optional int32 lastChunkHashCode = 8; + private int lastChunkHashCode_ ; + /** + * optional int32 lastChunkHashCode = 8; + */ + public boolean hasLastChunkHashCode() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public int getLastChunkHashCode() { + return lastChunkHashCode_; + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public Builder setLastChunkHashCode(int value) { + bitField0_ |= 0x00000080; + lastChunkHashCode_ = value; + onChanged(); + return this; + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public Builder clearLastChunkHashCode() { + bitField0_ = (bitField0_ & ~0x00000080); + lastChunkHashCode_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) } @@ -983,13 +1064,14 @@ public final class InstallSnapshotMessages { static { java.lang.String[] descriptorData = { "\n\025InstallSnapshot.proto\022(org.opendayligh" + - "t.controller.cluster.raft\"\235\001\n\017InstallSna" + + "t.controller.cluster.raft\"\270\001\n\017InstallSna" + "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" + "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" + "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" + - " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" + - "light.controller.protobuff.messages.clus" + - "ter.raftB\027InstallSnapshotMessagesH\001" + " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" + + "shCode\030\010 \001(\005BX\n;org.opendaylight.control" + + "ler.protobuff.messages.cluster.raftB\027Ins" + + "tallSnapshotMessagesH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1001,7 +1083,7 @@ public final class InstallSnapshotMessages { internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor, - new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", }); + new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", }); return null; } }; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto index 4198644b13..adb58ae4af 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto @@ -12,4 +12,5 @@ message InstallSnapshot { optional bytes data = 5; optional int32 chunkIndex = 6; optional int32 totalChunks = 7; + optional int32 lastChunkHashCode = 8; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index d53cb48e50..7073ea758b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -131,6 +132,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -171,6 +174,9 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( @@ -196,6 +202,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -259,6 +271,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java new file mode 100644 index 0000000000..4e61260550 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -0,0 +1,79 @@ +package org.opendaylight.controller.cluster.datastore; + + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class RoleChangeNotifierTest extends AbstractActorTest { + + @Test + public void testHandleRegisterRoleChangeListener() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "testHandleRegisterRoleChangeListener"; + ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(memberId), memberId); + + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + + RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply) + MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class); + assertNotNull(reply); + + RoleChangeNotification notification = (RoleChangeNotification) + MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class); + assertNull(notification); + }}; + + } + + @Test + public void testHandleRaftRoleChanged() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet"; + ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActor = getTestActor(); + + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(memberId), memberId); + + RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor(); + + notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); + + // no notification should be sent as listener has not yet registered + assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class)); + + // listener registers after role has been changed, ensure we sent the latest role change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + + RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply) + MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class); + assertNotNull(reply); + + RoleChangeNotification notification = (RoleChangeNotification) + MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class); + assertNotNull(notification); + assertEquals(RaftState.Candidate.name(), notification.getOldRole()); + assertEquals(RaftState.Leader.name(), notification.getNewRole()); + + }}; + + } +} + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java index f75aa5445b..4bd0ad818f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -8,10 +8,19 @@ package org.opendaylight.controller.cluster.datastore.utils; +import akka.actor.ActorRef; import akka.actor.UntypedActor; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * MessageCollectorActor collects messages as it receives them. It can send @@ -27,10 +36,55 @@ public class MessageCollectorActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ if("messages".equals(message)){ - getSender().tell(messages, getSelf()); + getSender().tell(new ArrayList(messages), getSelf()); } } else { messages.add(message); } } + + public static List getAllMessages(ActorRef actor) throws Exception { + FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + Timeout operationTimeout = new Timeout(operationDuration); + Future future = Patterns.ask(actor, "messages", operationTimeout); + + try { + return (List) Await.result(future, operationDuration); + } catch (Exception e) { + throw e; + } + } + + /** + * Get the first message that matches the specified class + * @param actor + * @param clazz + * @return + */ + public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + return message; + } + } + + return null; + } + + public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + List output = Lists.newArrayList(); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + output.add(message); + } + } + + return output; + } + }