import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
private int currentRecoveryBatchCount;
+ private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
currentBehavior = newBehavior;
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
@Override public void handleCommand(Object message) {
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else {
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
+
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
}
}
- private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
if (oldBehavior != currentBehavior){
onStateChanged();
}
- String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
- String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+ String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+ String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
// it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ }
- if (getRoleChangeNotifier().isPresent() &&
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ }
+
+ if (roleChangeNotifier.isPresent() &&
(oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
- getRoleChangeNotifier().get().tell(
- new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
- getSelf());
+ roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ currentBehavior.state().name()), getSelf());
}
}
return currentBehavior;
}
+ private static class BehaviorStateHolder {
+ private RaftActorBehavior behavior;
+ private String leaderId;
+
+ void init(RaftActorBehavior behavior) {
+ this.behavior = behavior;
+ this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ }
+
+ RaftActorBehavior getBehavior() {
+ return behavior;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+ }
}
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@Test
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("notifier-");
- factory.createTestActor(MockRaftActor.props(persistenceId,
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- List<RoleChanged> matches = null;
- for(int i = 0; i < 5000 / heartBeatInterval; i++) {
- matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
- if(matches.size() == 3) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
- }
-
- assertEquals(3, matches.size());
+ List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+ notifierActor.underlyingActor().clear();
+
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ leaderId = newLeaderId;
+ return this;
+ }
+ };
+
+ raftActor.changeCurrentBehavior(follower);
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
+
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ notifierActor.underlyingActor().clear();
+
+ raftActor.handleCommand("any");
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
}};
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String memberId;
+ private final String leaderId;
+
+ public LeaderStateChanged(String memberId, String leaderId) {
+ this.memberId = memberId;
+ this.leaderId = leaderId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+ .append("]");
+ return builder.toString();
+ }
+}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
/**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
* the listeners (within the same node), which are registered with it.
* <p/>
* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
*/
public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
- private String memberId;
- private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+ private final String memberId;
+ private final Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
private RoleChangeNotification latestRoleChangeNotification = null;
+ private LeaderStateChanged latestLeaderStateChanged;
public RoleChangeNotifier(String memberId) {
this.memberId = memberId;
getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+ if(latestLeaderStateChanged != null) {
+ getSender().tell(latestLeaderStateChanged, getSelf());
+ }
+
if (latestRoleChangeNotification != null) {
getSender().tell(latestRoleChangeNotification, getSelf());
}
for (ActorRef listener: registeredListeners.values()) {
listener.tell(latestRoleChangeNotification, getSelf());
}
+ } else if (message instanceof LeaderStateChanged) {
+ latestLeaderStateChanged = (LeaderStateChanged)message;
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestLeaderStateChanged, getSelf());
+ }
}
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.RaftState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
public class RoleChangeNotifierTest extends AbstractActorTest {
TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
- RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
-
notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
// no notification should be sent as listener has not yet registered
}};
}
+
+ @Test
+ public void testHandleLeaderStateChanged() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String actorId = "testHandleLeaderStateChanged";
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(actorId), actorId);
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender());
+
+ // listener registers after the sate has been changed, ensure we sent the latest state change after a reply
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef());
+
+ expectMsgClass(RegisterRoleChangeListenerReply.class);
+
+ LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender());
+
+ leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
+ }};
+ }
}