Add LeaderStateChanged notification 28/16328/5
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 17:55:27 +0000 (13:55 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 18:50:08 +0000 (14:50 -0400)
For upcoming work, the ShardManager will need to know a shard's
leaderId and, eventually, version. In the RaftActor, when the
behavior's leaderId changes, it now sends a LeaderStateChanged message
to the RoleChangedNotifier actor. The ShardManager will listen for
LeaderStateChanged messages (in another patch).

Change-Id: I53e2d7cae8fd19f96650546af395f82189a09fd8
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java

index 9faffb9..aa72485 100644 (file)
@@ -17,6 +17,7 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -128,6 +130,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private int currentRecoveryBatchCount;
 
+    private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -306,9 +310,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        RaftActorBehavior oldBehavior = currentBehavior;
+        reusableBehaviorStateHolder.init(currentBehavior);
         currentBehavior = newBehavior;
-        handleBehaviorChange(oldBehavior, currentBehavior);
+        handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -396,10 +400,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else {
-            RaftActorBehavior oldBehavior = currentBehavior;
+            reusableBehaviorStateHolder.init(currentBehavior);
+
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            handleBehaviorChange(oldBehavior, currentBehavior);
+            handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
         }
     }
 
@@ -446,22 +451,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
-    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+    private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+        RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
         if (oldBehavior != currentBehavior){
             onStateChanged();
         }
 
-        String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
-        String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+        String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+        String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
         // it can happen that the state has not changed but the leader has changed.
-        onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+        Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+        if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+            if(roleChangeNotifier.isPresent()) {
+                roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+            }
 
-        if (getRoleChangeNotifier().isPresent() &&
+            onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+        }
+
+        if (roleChangeNotifier.isPresent() &&
                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
-            getRoleChangeNotifier().get().tell(
-                    new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
-                    getSelf());
+            roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+                    currentBehavior.state().name()), getSelf());
         }
     }
 
@@ -1051,4 +1064,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return currentBehavior;
     }
 
+    private static class BehaviorStateHolder {
+        private RaftActorBehavior behavior;
+        private String leaderId;
+
+        void init(RaftActorBehavior behavior) {
+            this.behavior = behavior;
+            this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+        }
+
+        RaftActorBehavior getBehavior() {
+            return behavior;
+        }
+
+        String getLeaderId() {
+            return leaderId;
+        }
+    }
 }
index b192b7c..34932c7 100644 (file)
@@ -54,6 +54,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -64,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest {
     @Test
     public void testRaftRoleChangeNotifier() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+            TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                    Props.create(MessageCollectorActor.class));
             MessageCollectorActor.waitUntilReady(notifierActor);
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            factory.createTestActor(MockRaftActor.props(persistenceId,
+            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
 
-            List<RoleChanged> matches =  null;
-            for(int i = 0; i < 5000 / heartBeatInterval; i++) {
-                matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
-                assertNotNull(matches);
-                if(matches.size() == 3) {
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
-            }
-
-            assertEquals(3, matches.size());
+            List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
             // check if the notifier got a role change from null to Follower
             RoleChanged raftRoleChanged = matches.get(0);
@@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+            LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                    notifierActor, LeaderStateChanged.class);
+
+            assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+            notifierActor.underlyingActor().clear();
+
+            MockRaftActor raftActor = raftActorRef.underlyingActor();
+            final String newLeaderId = "new-leader";
+            Follower follower = new Follower(raftActor.getRaftActorContext()) {
+                @Override
+                public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+                    leaderId = newLeaderId;
+                    return this;
+                }
+            };
+
+            raftActor.changeCurrentBehavior(follower);
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(null, leaderStateChange.getLeaderId());
+
+            raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+            notifierActor.underlyingActor().clear();
+
+            raftActor.handleCommand("any");
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(newLeaderId, leaderStateChange.getLeaderId());
         }};
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
new file mode 100644 (file)
index 0000000..ec35b03
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String memberId;
+    private final String leaderId;
+
+    public LeaderStateChanged(String memberId, String leaderId) {
+        this.memberId = memberId;
+        this.leaderId = leaderId;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+                .append("]");
+        return builder.toString();
+    }
+}
index d065f6d..598dfb1 100644 (file)
@@ -17,16 +17,17 @@ import java.util.Map;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 /**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
  * the listeners (within the same node), which are registered with it.
  * <p/>
  * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
  */
 public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
 
-    private String memberId;
-    private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+    private final String memberId;
+    private final Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
     private RoleChangeNotification latestRoleChangeNotification = null;
+    private LeaderStateChanged latestLeaderStateChanged;
 
     public RoleChangeNotifier(String memberId) {
         this.memberId = memberId;
@@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
 
             getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
 
+            if(latestLeaderStateChanged != null) {
+                getSender().tell(latestLeaderStateChanged, getSelf());
+            }
+
             if (latestRoleChangeNotification != null) {
                 getSender().tell(latestRoleChangeNotification, getSelf());
             }
@@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
             for (ActorRef listener: registeredListeners.values()) {
                 listener.tell(latestRoleChangeNotification, getSelf());
             }
+        } else if (message instanceof LeaderStateChanged) {
+            latestLeaderStateChanged = (LeaderStateChanged)message;
+
+            for (ActorRef listener: registeredListeners.values()) {
+                listener.tell(latestLeaderStateChanged, getSelf());
+            }
         }
     }
 
index 4e61260..1ab03b2 100644 (file)
@@ -1,21 +1,22 @@
 package org.opendaylight.controller.cluster.datastore;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 public class RoleChangeNotifierTest extends AbstractActorTest  {
 
@@ -51,8 +52,6 @@ public class RoleChangeNotifierTest extends AbstractActorTest  {
             TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
                 getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
 
-            RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
-
             notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
 
             // no notification should be sent as listener has not yet registered
@@ -74,6 +73,32 @@ public class RoleChangeNotifierTest extends AbstractActorTest  {
         }};
 
     }
+
+    @Test
+    public void testHandleLeaderStateChanged() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String actorId = "testHandleLeaderStateChanged";
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(actorId), actorId);
+
+            notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender());
+
+            // listener registers after the sate has been changed, ensure we sent the latest state change after a reply
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef());
+
+            expectMsgClass(RegisterRoleChangeListenerReply.class);
+
+            LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+            assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+            assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
+
+            notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender());
+
+            leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+            assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+            assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
+        }};
+    }
 }
 
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.