From: Tom Pantelis Date: Tue, 24 Mar 2015 17:55:27 +0000 (-0400) Subject: Add LeaderStateChanged notification X-Git-Tag: release/lithium~355^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b57b5bca2d4d04b0e317972b081271bf35c2c818 Add LeaderStateChanged notification For upcoming work, the ShardManager will need to know a shard's leaderId and, eventually, version. In the RaftActor, when the behavior's leaderId changes, it now sends a LeaderStateChanged message to the RoleChangedNotifier actor. The ShardManager will listen for LeaderStateChanged messages (in another patch). Change-Id: I53e2d7cae8fd19f96650546af395f82189a09fd8 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 9faffb9395..aa72485187 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -17,6 +17,7 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -128,6 +130,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private int currentRecoveryBatchCount; + private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); + public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -306,9 +310,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); currentBehavior = newBehavior; - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); } @Override public void handleCommand(Object message) { @@ -396,10 +400,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else { - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); + currentBehavior = currentBehavior.handleMessage(getSender(), message); - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); } } @@ -446,22 +451,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) { + RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); + if (oldBehavior != currentBehavior){ onStateChanged(); } - String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId(); - String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name(); + String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId(); + String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. - onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + Optional roleChangeNotifier = getRoleChangeNotifier(); + if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { + if(roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + } - if (getRoleChangeNotifier().isPresent() && + onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + } + + if (roleChangeNotifier.isPresent() && (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { - getRoleChangeNotifier().get().tell( - new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()), - getSelf()); + roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + currentBehavior.state().name()), getSelf()); } } @@ -1051,4 +1064,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return currentBehavior; } + private static class BehaviorStateHolder { + private RaftActorBehavior behavior; + private String leaderId; + + void init(RaftActorBehavior behavior) { + this.behavior = behavior; + this.leaderId = behavior != null ? behavior.getLeaderId() : null; + } + + RaftActorBehavior getBehavior() { + return behavior; + } + + String getLeaderId() { + return leaderId; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index b192b7cd24..34932c7249 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -54,6 +54,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -64,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftRoleChangeNotifier() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); MessageCollectorActor.waitUntilReady(notifierActor); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); - List matches = null; - for(int i = 0; i < 5000 / heartBeatInterval; i++) { - matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); - assertNotNull(matches); - if(matches.size() == 3) { - break; - } - Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); - } - - assertEquals(3, matches.size()); + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); @@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); }}; } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java new file mode 100644 index 0000000000..ec35b03b0a --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * A message initiated internally from the RaftActor when some state of a leader has changed + * + * @author Thomas Pantelis + */ +public class LeaderStateChanged implements Serializable { + private static final long serialVersionUID = 1L; + + private final String memberId; + private final String leaderId; + + public LeaderStateChanged(String memberId, String leaderId) { + this.memberId = memberId; + this.leaderId = leaderId; + } + + public String getMemberId() { + return memberId; + } + + public String getLeaderId() { + return leaderId; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId) + .append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java index d065f6d211..598dfb1fe8 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -17,16 +17,17 @@ import java.util.Map; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; /** - * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying + * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying * the listeners (within the same node), which are registered with it. *

* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor. */ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable { - private String memberId; - private Map registeredListeners = Maps.newHashMap(); + private final String memberId; + private final Map registeredListeners = Maps.newHashMap(); private RoleChangeNotification latestRoleChangeNotification = null; + private LeaderStateChanged latestLeaderStateChanged; public RoleChangeNotifier(String memberId) { this.memberId = memberId; @@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos getSender().tell(new RegisterRoleChangeListenerReply(), getSelf()); + if(latestLeaderStateChanged != null) { + getSender().tell(latestLeaderStateChanged, getSelf()); + } + if (latestRoleChangeNotification != null) { getSender().tell(latestRoleChangeNotification, getSelf()); } @@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos for (ActorRef listener: registeredListeners.values()) { listener.tell(latestRoleChangeNotification, getSelf()); } + } else if (message instanceof LeaderStateChanged) { + latestLeaderStateChanged = (LeaderStateChanged)message; + + for (ActorRef listener: registeredListeners.values()) { + listener.tell(latestLeaderStateChanged, getSelf()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java index 4e61260550..1ab03b216c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -1,21 +1,22 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.RaftState; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; public class RoleChangeNotifierTest extends AbstractActorTest { @@ -51,8 +52,6 @@ public class RoleChangeNotifierTest extends AbstractActorTest { TestActorRef notifierTestActorRef = TestActorRef.create( getSystem(), RoleChangeNotifier.getProps(memberId), memberId); - RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor(); - notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); // no notification should be sent as listener has not yet registered @@ -74,6 +73,32 @@ public class RoleChangeNotifierTest extends AbstractActorTest { }}; } + + @Test + public void testHandleLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) {{ + String actorId = "testHandleLeaderStateChanged"; + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(actorId), actorId); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender()); + + // listener registers after the sate has been changed, ensure we sent the latest state change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef()); + + expectMsgClass(RegisterRoleChangeListenerReply.class); + + LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender()); + + leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); + }}; + } }