import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftState;
*/
public class ExampleActor extends RaftActor {
- private final Map<String, String> state = new HashMap<>();
+ private final Map<String, String> state = new HashMap();
private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
+ private Optional<ActorRef> roleChangeNotifier;
- public ExampleActor(final String id, final Map<String, String> peerAddresses,
- final Optional<ConfigParams> configParams) {
+ public ExampleActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
this.dataPersistenceProvider = new PersistentDataProvider();
+ roleChangeNotifier = createRoleChangeNotifier(id);
}
public static Props props(final String id, final Map<String, String> peerAddresses,
final Optional<ConfigParams> configParams){
return Props.create(new Creator<ExampleActor>(){
- private static final long serialVersionUID = 1L;
@Override public ExampleActor create() throws Exception {
return new ExampleActor(id, peerAddresses, configParams);
});
}
- @Override public void onReceiveCommand(final Object message) throws Exception{
+ @Override public void onReceiveCommand(Object message) throws Exception{
if(message instanceof KeyValue){
if(isLeader()) {
String persistId = Long.toString(persistIdentifier++);
String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
- LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+ LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet());
}
}
}
+ protected String getReplicatedLogState() {
+ return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+ + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+ + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+ }
+
+ public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+ ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+ return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+ }
+
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
@Override protected void applyState(final ActorRef clientActor, final String identifier,
final Object data) {
if(data instanceof KeyValue){
getSelf().tell(new CaptureSnapshotReply(bs), null);
}
- @Override protected void applySnapshot(final ByteString snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
state.clear();
try {
- state.putAll((Map<String, String>) toObject(snapshot));
+ state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
LOG.error(e, "Exception in applying snapshot");
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+ LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
}
}
- private ByteString fromObject(final Object snapshot) throws Exception {
+ private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
try {
}
}
- private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+ private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
return dataPersistenceProvider;
}
- @Override public void onReceiveRecover(final Object message)throws Exception {
+ @Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
}
@Override
- protected void startLogRecoveryBatch(final int maxBatchSize) {
+ protected void startLogRecoveryBatch(int maxBatchSize) {
}
@Override
- protected void appendRecoveredLogEntry(final Payload data) {
+ protected void appendRecoveredLogEntry(Payload data) {
}
@Override
}
@Override
- protected void applyRecoverySnapshot(final ByteString snapshot) {
+ protected void applyRecoverySnapshot(ByteString snapshot) {
}
}
--- /dev/null
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.japi.Creator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.example.messages.RegisterListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * <p/>
+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ * <p/>
+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * <p/>
+ * If all the notifiers have been regsitered with, then it cancels the scheduler.
+ * It starts the scheduler again when it receives a new registration
+ *
+ */
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+ // the akka url should be set to the notifiers actor-system and domain.
+ private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+
+ private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+ private Cancellable registrationSchedule = null;
+ private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
+ private final String memberName;
+ private static final String[] shardsToMonitor = new String[] {"example"};
+
+ public ExampleRoleChangeListener(String memberName) {
+ super();
+ scheduleRegistrationListener(schedulerDuration);
+ this.memberName = memberName;
+ populateRegistry(memberName);
+ }
+
+ public static Props getProps(final String memberName) {
+ return Props.create(new Creator<Actor>() {
+ @Override
+ public Actor create() throws Exception {
+ return new ExampleRoleChangeListener(memberName);
+ }
+ });
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof RegisterListener) {
+ // called by the scheduler at intervals to register any unregistered notifiers
+ sendRegistrationRequests();
+
+ } else if (message instanceof RegisterRoleChangeListenerReply) {
+ // called by the Notifier
+ handleRegisterRoleChangeListenerReply(getSender().path().toString());
+
+ } else if (message instanceof RoleChangeNotification) {
+ // called by the Notifier
+ RoleChangeNotification notification = (RoleChangeNotification) message;
+
+ LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
+ notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
+
+ // the apps dependent on such notifications can be called here
+ //TODO: add implementation here
+
+ }
+ }
+
+ private void scheduleRegistrationListener(FiniteDuration interval) {
+ LOG.debug("--->scheduleRegistrationListener called.");
+ registrationSchedule = getContext().system().scheduler().schedule(
+ interval, interval, getSelf(), new RegisterListener(),
+ getContext().system().dispatcher(), getSelf());
+
+ }
+
+ private void populateRegistry(String memberName) {
+
+ for (String shard: shardsToMonitor) {
+ String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+ .append("/").append(memberName).append("-notifier").toString();
+
+ if (!notifierRegistrationStatus.containsKey(notifier)) {
+ notifierRegistrationStatus.put(notifier, false);
+ }
+ }
+
+ if (!registrationSchedule.isCancelled()) {
+ scheduleRegistrationListener(schedulerDuration);
+ }
+ }
+
+ private void sendRegistrationRequests() {
+ for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
+ if (!entry.getValue()) {
+ try {
+ LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
+ ActorRef notifier = Await.result(
+ getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+
+ notifier.tell(new RegisterRoleChangeListener(), getSelf());
+
+ } catch (Exception e) {
+ LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
+ }
+ }
+ }
+ }
+
+ private void handleRegisterRoleChangeListenerReply(String senderId) {
+ if (notifierRegistrationStatus.containsKey(senderId)) {
+ notifierRegistrationStatus.put(senderId, true);
+
+ //cancel the schedule when listener is registered with all notifiers
+ if (!registrationSchedule.isCancelled()) {
+ boolean cancelScheduler = true;
+ for (Boolean value : notifierRegistrationStatus.values()) {
+ cancelScheduler = cancelScheduler & value;
+ }
+ if (cancelScheduler) {
+ registrationSchedule.cancel();
+ }
+ }
+ } else {
+ LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+ senderId);
+ }
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ registrationSchedule.cancel();
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
/**
* This is a test driver for testing akka-raft implementation
*/
public class TestDriver {
- private static final ActorSystem actorSystem = ActorSystem.create();
+
private static Map<String, String> allPeers = new HashMap<>();
private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
private int nameCounter = 0;
private static ConfigParams configParams = new ExampleConfigParamsImpl();
+ private static ActorSystem actorSystem;
+ private static ActorSystem listenerActorSystem;
+
/**
* Create nodes, add clients and start logging.
* Commands
* @throws Exception
*/
public static void main(String[] args) throws Exception {
+
+ actorSystem = ActorSystem.create("raft-test", ConfigFactory
+ .load().getConfig("raft-test"));
+
+ listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+ .load().getConfig("raft-test-listener"));
+
TestDriver td = new TestDriver();
System.out.println("Enter command (type bye to exit):");
}
}
+ // create the listener using a separate actor system for each example actor
+ private void createClusterRoleChangeListener(List<String> memberIds) {
+ System.out.println("memberIds="+memberIds);
+ for (String memberId : memberIds) {
+ ActorRef listenerActor = listenerActorSystem.actorOf(
+ ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+ System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+ }
+ }
+
public static ActorRef createExampleActor(String name) {
return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
Optional.of(configParams)), name);
public void createNodes(int num) {
for (int i=0; i < num; i++) {
nameCounter = nameCounter + 1;
- allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+ allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
}
for (String s : allPeers.keySet()) {
System.out.println("Created node:"+s);
}
+
+ createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
}
// add num clients to all nodes in the system
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers
+ *
+ * This message is sent by the scheduler
+ */
+public class RegisterListener {
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.util.List;
+
+/**
+ * Created by kramesha on 11/18/14.
+ */
+public class SetNotifiers {
+ private List<String> notifierList;
+
+ public SetNotifiers(List<String> notifierList) {
+ this.notifierList = notifierList;
+ }
+
+ public List<String> getNotifierList() {
+ return notifierList;
+ }
+}
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import java.io.Serializable;
-import java.util.Map;
-
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* in a cluster. It implements the RAFT algorithm as described in the paper
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
onRecoveryComplete();
+
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
}
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
@Override public void handleCommand(Object message) {
RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- if(oldBehavior != currentBehavior){
- onStateChanged();
- }
-
- onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
- public java.util.Set<String> getPeers() {
-
- return context.getPeerAddresses().keySet();
- }
+ private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ if (oldBehavior != currentBehavior){
+ onStateChanged();
+ }
+ if (oldBehavior != null) {
+ // it can happen that the state has not changed but the leader has changed.
+ onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
- protected String getReplicatedLogState() {
- return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
- + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
- + ", im-mem journal size=" + context.getReplicatedLog().size();
+ if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+ // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+ getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+ currentBehavior.state().name()), getSelf());
+ }
+ }
}
-
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
protected abstract DataPersistenceProvider persistence();
+ /**
+ * Notifier Actor for this RaftActor to notify when a role change happens
+ * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+ */
+ protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private void trimPersistentData(long sequenceNumber) {
protected RaftActorBehavior getCurrentBehavior() {
return currentBehavior;
}
+
}
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+ // The index of the first chunk that is sent when installing a snapshot
+ public static final int FIRST_CHUNK_INDEX = 1;
+
+ // The index that the follower should respond with if it needs the install snapshot to be reset
+ public static final int INVALID_CHUNK_INDEX = -1;
+
+ // This would be passed as the hash code of the last chunk when sending the first chunk
+ public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
"sending snapshot chunk failed, Will retry, Chunk:{}",
reply.getChunkIndex()
);
+
followerToSnapshot.markSendStatus(false);
}
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
+
+ if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ // Since the Follower did not find this index to be valid we should reset the follower snapshot
+ // so that Installing the snapshot can resume from the beginning
+ followerToSnapshot.reset();
+ }
}
}
context.getReplicatedLog().getSnapshotTerm(),
getNextSnapshotChunk(followerId,snapshot.get()),
mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+ Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
).toSerializable(),
actor()
);
private boolean replyStatus = false;
private int chunkIndex;
private int totalChunks;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
LOG.debug("Snapshot {} bytes, total chunks to send:{}",
size, totalChunks);
}
+ replyReceivedForOffset = -1;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
public ByteString getSnapshotBytes() {
// if the chunk sent was successful
replyReceivedForOffset = offset;
replyStatus = true;
+ lastChunkHashCode = nextChunkHashCode;
} else {
// if the chunk sent was failure
replyReceivedForOffset = offset;
LOG.debug("length={}, offset={},size={}",
snapshotLength, start, size);
}
- return getSnapshotBytes().substring(start, start + size);
+ ByteString substring = getSnapshotBytes().substring(start, start + size);
+ nextChunkHashCode = substring.hashCode();
+ return substring;
+ }
+
+ /**
+ * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ */
+ public void reset(){
+ offset = 0;
+ replyStatus = false;
+ replyReceivedForOffset = offset;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+ lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ }
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode;
}
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.ArrayList;
-
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
- private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
super(context);
);
}
- try {
- if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
- // this is the last chunk, create a snapshot object and apply
-
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
- }
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
- Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
- new ArrayList<ReplicatedLogEntry>(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
actor().tell(new ApplySnapshot(snapshot), actor());
- } else {
- // we have more to go
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ snapshotTracker = null;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
- }
}
sender.tell(new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
- true), actor());
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
- } catch (Exception e) {
LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- installSnapshot.getChunkIndex(), false), actor());
+ installSnapshot.getChunkIndex(), false), actor());
+
}
}
@Override public void close() throws Exception {
stopElection();
}
+
+ @VisibleForTesting
+ ByteString getSnapshotChunksCollected(){
+ return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+ }
+
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+ private final LoggingAdapter LOG;
+ private final int totalChunks;
+ private ByteString collectedChunks = ByteString.EMPTY;
+ private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+ private boolean sealed = false;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+ SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+ this.LOG = LOG;
+ this.totalChunks = totalChunks;
+ }
+
+ /**
+ * Adds a chunk to the tracker
+ *
+ * @param chunkIndex
+ * @param chunk
+ * @return true when the lastChunk is received
+ * @throws InvalidChunkException
+ */
+ boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ if(sealed){
+ throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+ }
+
+ if(lastChunkIndex + 1 != chunkIndex){
+ throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+ }
+
+ if(lastChunkHashCode.isPresent()){
+ if(lastChunkHashCode.get() != this.lastChunkHashCode){
+ throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+ "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+ }
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},collectedChunks.size:{}",
+ chunkIndex, collectedChunks.size());
+ }
+
+ sealed = (chunkIndex == totalChunks);
+ lastChunkIndex = chunkIndex;
+ collectedChunks = collectedChunks.concat(chunk);
+ this.lastChunkHashCode = chunk.hashCode();
+ return sealed;
+ }
+
+ byte[] getSnapshot(){
+ if(!sealed) {
+ throw new IllegalStateException("lastChunk not received yet");
+ }
+
+ return collectedChunks.toByteArray();
+ }
+
+ ByteString getCollectedChunks(){
+ return collectedChunks;
+ }
+
+ public static class InvalidChunkException extends Exception {
+ InvalidChunkException(String message){
+ super(message);
+ }
+ }
+
+}
package org.opendaylight.controller.cluster.raft.messages;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
private final ByteString data;
private final int chunkIndex;
private final int totalChunks;
+ private final Optional<Integer> lastChunkHashCode;
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
this.data = data;
this.chunkIndex = chunkIndex;
this.totalChunks = totalChunks;
+ this.lastChunkHashCode = lastChunkHashCode;
}
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+ }
+
+
public String getLeaderId() {
return leaderId;
}
return totalChunks;
}
- public <T extends Object> Object toSerializable(){
- return InstallSnapshotMessages.InstallSnapshot.newBuilder()
- .setLeaderId(this.getLeaderId())
- .setChunkIndex(this.getChunkIndex())
- .setData(this.getData())
- .setLastIncludedIndex(this.getLastIncludedIndex())
- .setLastIncludedTerm(this.getLastIncludedTerm())
- .setTotalChunks(this.getTotalChunks()).build();
+ public Optional<Integer> getLastChunkHashCode() {
+ return lastChunkHashCode;
+ }
+ public <T extends Object> Object toSerializable(){
+ InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(this.getData())
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks());
+
+ if(lastChunkHashCode.isPresent()){
+ builder.setLastChunkHashCode(lastChunkHashCode.get());
+ }
+ return builder.build();
}
public static InstallSnapshot fromSerializable (Object o) {
InstallSnapshotMessages.InstallSnapshot from =
(InstallSnapshotMessages.InstallSnapshot) o;
+ Optional<Integer> lastChunkHashCode = Optional.absent();
+ if(from.hasLastChunkHashCode()){
+ lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ }
+
InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
from.getLeaderId(), from.getLastIncludedIndex(),
from.getLastIncludedTerm(), from.getData(),
- from.getChunkIndex(), from.getTotalChunks());
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
return installSnapshot;
}
serialization-bindings {
"org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+ "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
"com.google.protobuf.Message" = proto
"com.google.protobuf.GeneratedMessage" = proto
}
}
}
+
+raft-test {
+ akka {
+
+ loglevel = "DEBUG"
+
+ actor {
+ # enable to test serialization only.
+ # serialize-messages = on
+
+ provider = "akka.remote.RemoteActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+ "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ }
+ }
+
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2550
+ }
+ }
+ }
+}
+
+raft-test-listener {
+
+ akka {
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2554
+ }
+ }
+
+ member-id = "member-1"
+ }
+}
+
+
+
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
private final DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final List<Object> state;
+ private ActorRef roleChangeNotifier;
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private static final long serialVersionUID = 1L;
private final String id;
private final Optional<ConfigParams> config;
private final DataPersistenceProvider dataPersistenceProvider;
+ private final ActorRef roleChangeNotifier;
private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+ Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+ ActorRef roleChangeNotifier) {
this.peerAddresses = peerAddresses;
this.id = id;
this.config = config;
this.dataPersistenceProvider = dataPersistenceProvider;
+ this.roleChangeNotifier = roleChangeNotifier;
}
@Override
public MockRaftActor create() throws Exception {
- return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
+ MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+ dataPersistenceProvider);
+ mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+ return mockRaftActor;
}
}
- private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- private final List<Object> state;
-
public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
public static Props props(final String id, final Map<String, String> peerAddresses,
Optional<ConfigParams> config){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
}
public static Props props(final String id, final Map<String, String> peerAddresses,
Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+ }
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
LOG.info("applyState called");
}
-
-
-
@Override
protected void startLogRecoveryBatch(int maxBatchSize) {
}
return this.dataPersistenceProvider;
}
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return Optional.fromNullable(roleChangeNotifier);
+ }
+
@Override public String persistenceId() {
return this.getId();
}
};
}
+ @Test
+ public void testRaftRoleChangeNotifier() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ String id = "testRaftRoleChangeNotifier";
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
+ Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
+
+ // sleeping for a minimum of 2 seconds, if it spans more its fine.
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ assertEquals(2, matches.size());
+
+ // check if the notifier got a role change from Follower to Candidate
+ RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+ assertEquals(id, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Candidate to Leader
+ raftRoleChanged = (RoleChanged) matches.get(1);
+ assertEquals(id, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+ }};
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class FollowerTest extends AbstractRaftActorBehaviorTest {
private final ActorRef followerActor = getSystem().actorOf(Props.create(
int offset = 0;
int snapshotLength = bsSnapshot.size();
int i = 1;
+ int chunkIndex = 1;
do {
chunkData = getNextChunk(bsSnapshot, offset);
final InstallSnapshot installSnapshot =
new InstallSnapshot(1, "leader-1", i, 1,
- chunkData, i, 3);
+ chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot);
offset = offset + 50;
i++;
+ chunkIndex++;
} while ((offset+50) < snapshotLength);
- final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+ final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot3);
String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
}
}.get();
+ // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
String applySnapshotMatch = "";
for (String reply: matches) {
if (reply.startsWith("applySnapshot")) {
}};
}
+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(
+ MessageCollectorActor.class));
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext(getRef());
+
+ Follower follower = (Follower) createBehavior(context);
+
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ ByteString bsSnapshot = toByteString(followerSnapshot);
+
+ final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+ assertNotNull(messages);
+ assertTrue(messages instanceof List);
+ List<Object> listMessages = (List<Object>) messages;
+
+ int installSnapshotReplyReceivedCount = 0;
+ for (Object message: listMessages) {
+ if (message instanceof InstallSnapshotReply) {
+ ++installSnapshotReplyReceivedCount;
+ }
+ }
+
+ assertEquals(1, installSnapshotReplyReceivedCount);
+ InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+ assertEquals(false, reply.isSuccess());
+ assertEquals(-1, reply.getChunkIndex());
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+ }};
+ }
+
public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
return MessageCollectorActor.getAllMessages(actor);
}
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
}};
}
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
@Test
public void testFollowerToSnapshotLogic() {
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+ Map<String, String> data;
+ ByteString byteString;
+ ByteString chunk1;
+ ByteString chunk2;
+ ByteString chunk3;
+
+ @Before
+ public void setup(){
+ data = new HashMap<>();
+ data.put("key1", "value1");
+ data.put("key2", "value2");
+ data.put("key3", "value3");
+
+ byteString = toByteString(data);
+ chunk1 = getNextChunk(byteString, 0, 10);
+ chunk2 = getNextChunk(byteString, 10, 10);
+ chunk3 = getNextChunk(byteString, 20, byteString.size());
+ }
+
+ @Test
+ public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ try {
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+ e.getMessage().startsWith("Invalid chunk");
+ }
+
+ // The first chunk's index must at least be FIRST_CHUNK_INDEX
+ SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ try {
+ tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // Out of sequence chunk indexes won't work
+ SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+ try {
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // No exceptions will be thrown when invalid chunk is added with the right sequence
+ // If the lastChunkHashCode is missing
+ SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+ // Look I can add the same chunk again
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+ // An exception will be thrown when an invalid chunk is addedd with the right sequence
+ // when the lastChunkHashCode is present
+ SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+ try {
+ // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+ Assert.fail();
+ }catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ }
+
+ @Test
+ public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+ // Trying to get a snapshot before all chunks have been received will throw an exception
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ try {
+ tracker1.getSnapshot();
+ Assert.fail();
+ } catch(IllegalStateException e){
+
+ }
+
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ byte[] snapshot = tracker2.getSnapshot();
+
+ assertEquals(byteString, ByteString.copyFrom(snapshot));
+ }
+
+ @Test
+ public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ ByteString chunks = chunk1.concat(chunk2);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ assertEquals(chunks, tracker1.getCollectedChunks());
+ }
+
+ public ByteString getNextChunk (ByteString bs, int offset, int size){
+ int snapshotLength = bs.size();
+ int start = offset;
+ if (size > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if ((start + size) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+ return bs.substring(start, start + size);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+
+}
\ No newline at end of file
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
public class MessageCollectorActor extends UntypedActor {
private List<Object> messages = new ArrayList<>();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RegisterRoleChangeListener implements Serializable {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Reply message sent from a RoleChangeNotifier to the Role Change Listener
+ *
+ * Can be sent to a separate actor system and hence should be made serializable.
+ */
+public class RegisterRoleChangeListenerReply implements Serializable {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Notification message representing a Role change of a cluster member
+ *
+ * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RoleChangeNotification implements Serializable {
+ private String memberId;
+ private String oldRole;
+ private String newRole;
+
+ public RoleChangeNotification(String memberId, String oldRole, String newRole) {
+ this.memberId = memberId;
+ this.oldRole = oldRole;
+ this.newRole = newRole;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getOldRole() {
+ return oldRole;
+ }
+
+ public String getNewRole() {
+ return newRole;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import akka.actor.Actor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.serialization.Serialization;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
+/**
+ * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * the listeners (within the same node), which are registered with it.
+ * <p/>
+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
+ */
+public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
+
+ private String memberId;
+ private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+ private RoleChangeNotification latestRoleChangeNotification = null;
+
+ public RoleChangeNotifier(String memberId) {
+ this.memberId = memberId;
+ }
+
+ public static Props getProps(final String memberId) {
+ return Props.create(new Creator<Actor>() {
+ @Override
+ public Actor create() throws Exception {
+ return new RoleChangeNotifier(memberId);
+ }
+ });
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
+ LOG.info("RoleChangeNotifier:{} created and ready for shard:{}",
+ Serialization.serializedActorPath(getSelf()), memberId);
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof RegisterRoleChangeListener) {
+ // register listeners for this shard
+
+ ActorRef curRef = registeredListeners.get(getSender().path());
+ if (curRef != null) {
+ // ActorPaths would pass equal even if the unique id of the actors are different
+ // if a listener actor is re-registering after reincarnation, then removing the existing
+ // entry so the actor path with correct unique id is registered.
+ registeredListeners.remove(getSender().path());
+ }
+ registeredListeners.put(getSender().path(), getSender());
+
+ LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId,
+ getSender().path().toString());
+
+ getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+
+ if (latestRoleChangeNotification != null) {
+ getSender().tell(latestRoleChangeNotification, getSelf());
+ }
+
+
+ } else if (message instanceof RoleChanged) {
+ // this message is sent by RaftActor. Notify registered listeners when this message is received.
+ RoleChanged roleChanged = (RoleChanged) message;
+
+ LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
+ roleChanged.getOldRole(), roleChanged.getNewRole());
+
+ latestRoleChangeNotification =
+ new RoleChangeNotification(roleChanged.getMemberId(),
+ roleChanged.getOldRole(), roleChanged.getNewRole());
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestRoleChangeNotification, getSelf());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ registeredListeners.clear();
+ }
+}
+
--- /dev/null
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * Role Change message initiated internally from the Raft Actor when a the behavior/role changes.
+ *
+ * Since its internal , need not be serialized
+ *
+ */
+public class RoleChanged {
+ private String memberId;
+ private String oldRole;
+ private String newRole;
+
+ public RoleChanged(String memberId, String oldRole, String newRole) {
+ this.memberId = memberId;
+ this.oldRole = oldRole;
+ this.newRole = newRole;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getOldRole() {
+ return oldRole;
+ }
+
+ public String getNewRole() {
+ return newRole;
+ }
+}
* <code>optional int32 totalChunks = 7;</code>
*/
int getTotalChunks();
+
+ // optional int32 lastChunkHashCode = 8;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ boolean hasLastChunkHashCode();
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ int getLastChunkHashCode();
}
/**
* Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
totalChunks_ = input.readInt32();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return totalChunks_;
}
+ // optional int32 lastChunkHashCode = 8;
+ public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+ private int lastChunkHashCode_;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+
private void initFields() {
term_ = 0L;
leaderId_ = "";
data_ = com.google.protobuf.ByteString.EMPTY;
chunkIndex_ = 0;
totalChunks_ = 0;
+ lastChunkHashCode_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeInt32(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeInt32(8, lastChunkHashCode_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(8, lastChunkHashCode_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000020);
totalChunks_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ lastChunkHashCode_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
to_bitField0_ |= 0x00000040;
}
result.totalChunks_ = totalChunks_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.lastChunkHashCode_ = lastChunkHashCode_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasTotalChunks()) {
setTotalChunks(other.getTotalChunks());
}
+ if (other.hasLastChunkHashCode()) {
+ setLastChunkHashCode(other.getLastChunkHashCode());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 lastChunkHashCode = 8;
+ private int lastChunkHashCode_ ;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder setLastChunkHashCode(int value) {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder clearLastChunkHashCode() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ lastChunkHashCode_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
}
static {
java.lang.String[] descriptorData = {
"\n\025InstallSnapshot.proto\022(org.opendayligh" +
- "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+ "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
"pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
"\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
"Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
- " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
- "light.controller.protobuff.messages.clus" +
- "ter.raftB\027InstallSnapshotMessagesH\001"
+ " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+ "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+ "ler.protobuff.messages.cluster.raftB\027Ins" +
+ "tallSnapshotMessagesH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
- new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+ new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
return null;
}
};
optional bytes data = 5;
optional int32 chunkIndex = 6;
optional int32 totalChunks = 7;
+ optional int32 lastChunkHashCode = 8;
}
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
private Cancellable txCommitTimeoutCheckSchedule;
+ private Optional<ActorRef> roleChangeNotifier;
+
/**
* Coordinates persistence recovery on startup.
*/
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+ // create a notifier actor for each cluster member
+ roleChangeNotifier = createRoleChangeNotifier(name.toString());
}
private static Map<String, String> mapPeerAddresses(
return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
}
+ private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+ ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+ return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ }
+
@Override
public void postStop() {
super.postStop();
}
}
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RoleChangeNotifierTest extends AbstractActorTest {
+
+ @Test
+ public void testHandleRegisterRoleChangeListener() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "testHandleRegisterRoleChangeListener";
+ ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+ RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+ MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+ assertNotNull(reply);
+
+ RoleChangeNotification notification = (RoleChangeNotification)
+ MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+ assertNull(notification);
+ }};
+
+ }
+
+ @Test
+ public void testHandleRaftRoleChanged() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet";
+ ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActor = getTestActor();
+
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+ RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
+
+ notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
+
+ // no notification should be sent as listener has not yet registered
+ assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class));
+
+ // listener registers after role has been changed, ensure we sent the latest role change after a reply
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+ RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+ MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+ assertNotNull(reply);
+
+ RoleChangeNotification notification = (RoleChangeNotification)
+ MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+ assertNotNull(notification);
+ assertEquals(RaftState.Candidate.name(), notification.getOldRole());
+ assertEquals(RaftState.Leader.name(), notification.getNewRole());
+
+ }};
+
+ }
+}
+
+
package org.opendaylight.controller.cluster.datastore.utils;
+import akka.actor.ActorRef;
import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* MessageCollectorActor collects messages as it receives them. It can send
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
if("messages".equals(message)){
- getSender().tell(messages, getSelf());
+ getSender().tell(new ArrayList(messages), getSelf());
}
} else {
messages.add(message);
}
}
+
+ public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+ FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+ Timeout operationTimeout = new Timeout(operationDuration);
+ Future<Object> future = Patterns.ask(actor, "messages", operationTimeout);
+
+ try {
+ return (List<Object>) Await.result(future, operationDuration);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * Get the first message that matches the specified class
+ * @param actor
+ * @param clazz
+ * @return
+ */
+ public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
+ List<Object> allMessages = getAllMessages(actor);
+
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ return message;
+ }
+ }
+
+ return null;
+ }
+
+ public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+ List<Object> allMessages = getAllMessages(actor);
+
+ List<Object> output = Lists.newArrayList();
+
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ output.add(message);
+ }
+ }
+
+ return output;
+ }
+
}