X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=d647475e4d53319e1638c0e4c03fe4dfcd58a891;hp=042b9fb56995d259d99a1559d4a3e3d5389fdba3;hb=a469dbcec569cc972df0cd57cf725a2173d2604a;hpb=0b377f44fde06a1ddcd4e3596a6602004ff0d335 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 042b9fb569..d647475e4d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import java.io.Serializable; +import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import java.io.Serializable; -import java.util.Map; - /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper @@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); onRecoveryComplete(); + + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } } } @@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); + RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = new Follower(context); - onStateChanged(); + handleBehaviorChange(oldBehavior, currentBehavior); } @Override public void handleCommand(Object message) { @@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); - if(oldBehavior != currentBehavior){ - onStateChanged(); - } - - onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); + handleBehaviorChange(oldBehavior, currentBehavior); } } - public java.util.Set getPeers() { - - return context.getPeerAddresses().keySet(); - } + private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + if (oldBehavior != currentBehavior){ + onStateChanged(); + } + if (oldBehavior != null) { + // it can happen that the state has not changed but the leader has changed. + onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId()); - protected String getReplicatedLogState() { - return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() - + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() - + ", im-mem journal size=" + context.getReplicatedLog().size(); + if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) { + // we do not want to notify when the behavior/role is set for the first time (i.e follower) + getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(), + currentBehavior.state().name()), getSelf()); + } + } } - /** * When a derived RaftActor needs to persist something it must call * persistData. @@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract DataPersistenceProvider persistence(); + /** + * Notifier Actor for this RaftActor to notify when a role change happens + * @return ActorRef - ActorRef of the notifier or Optional.absent if none. + */ + protected abstract Optional getRoleChangeNotifier(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { @@ -843,4 +852,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected RaftActorBehavior getCurrentBehavior() { return currentBehavior; } + }