import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
private SnapshotTracker snapshotTracker = null;
- private final InitialSyncStatusTracker initialSyncStatusTracker;
+ private final SyncStatusTracker initialSyncStatusTracker;
+
+ private static final int SYNC_THRESHOLD = 10;
public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+ initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(context.getRaftPolicy().automaticElectionsEnabled()) {
if (context.getPeerAddresses().isEmpty()) {
SnapshotTracker getSnapshotTracker(){
return snapshotTracker;
}
-
- private class InitialSyncStatusTracker {
-
- private static final long INVALID_LOG_INDEX = -2L;
- private long initialLeaderCommit = INVALID_LOG_INDEX;
- private boolean initialSyncUpDone = false;
- private String syncedLeaderId = null;
- private final ActorRef actor;
-
- public InitialSyncStatusTracker(ActorRef actor) {
- this.actor = actor;
- }
-
- public void update(String leaderId, long leaderCommit, long commitIndex){
-
- if(!leaderId.equals(syncedLeaderId)){
- initialSyncUpDone = false;
- initialLeaderCommit = INVALID_LOG_INDEX;
- syncedLeaderId = leaderId;
- }
-
- if(!initialSyncUpDone){
- if(initialLeaderCommit == INVALID_LOG_INDEX){
- actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
- initialLeaderCommit = leaderCommit;
- } else if(commitIndex >= initialLeaderCommit){
- actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
- initialSyncUpDone = true;
- }
- }
- }
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+
+/**
+ * The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not
+ * When an update is received from the Leader and the update happens to be the first update
+ * from that Leader then the SyncStatusTracker will not mark the Follower as not in-sync till the
+ * Followers commitIndex matches the commitIndex that the Leader sent in it's very first update.
+ * Subsequently when an update is received the tracker will consider the Follower to be out of
+ * sync if it is behind by 'syncThreshold' commits.
+ */
+public class SyncStatusTracker {
+
+ private static final boolean IN_SYNC = true;
+ private static final boolean NOT_IN_SYNC = false;
+ private static final boolean FORCE_STATUS_CHANGE = true;
+
+ private final String id;
+ private String syncedLeaderId = null;
+ private final ActorRef actor;
+ private final int syncThreshold;
+ private boolean syncStatus = false;
+ private long minimumExpectedIndex = -2L;
+
+ public SyncStatusTracker(ActorRef actor, String id, int syncThreshold) {
+ this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+ this.id = Preconditions.checkNotNull(id, "id should not be null");
+ Preconditions.checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0");
+ this.syncThreshold = syncThreshold;
+ }
+
+ public void update(String leaderId, long leaderCommit, long commitIndex){
+ leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null");
+
+ if(!leaderId.equals(syncedLeaderId)){
+ minimumExpectedIndex = leaderCommit;
+ changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE);
+ syncedLeaderId = leaderId;
+ return;
+ }
+
+ if((leaderCommit - commitIndex) > syncThreshold){
+ changeSyncStatus(NOT_IN_SYNC);
+ } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) {
+ changeSyncStatus(IN_SYNC);
+ }
+ }
+
+ private void changeSyncStatus(boolean newSyncStatus){
+ changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE);
+ }
+
+ private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){
+ if(syncStatus == newSyncStatus && !forceStatusChange){
+ return;
+ }
+ actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
+ syncStatus = newSyncStatus;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+public class SyncStatusTrackerTest extends AbstractActorTest {
+ protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ private final TestActorRef<MessageCollectorActor> listener = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId("listener"));
+
+ @After
+ public void tearDown(){
+ actorFactory.close();
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ SyncStatusTracker tracker = new SyncStatusTracker(listener, "commit-tracker", 10);
+
+ // When leader-1 sends the first update message the listener should receive a syncStatus notification
+ // with status set to false
+ tracker.update("leader-1", 100, 99);
+ FollowerInitialSyncUpStatus status =
+ MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertEquals(false, status.isInitialSyncDone());
+ MessageCollectorActor.clearMessages(listener);
+
+ // At a minimum the follower should have the commit index that the new leader sent it in the first message
+ // Also the commit index must be below the syncThreshold. If both conditions are met a new sync status
+ // message with status = true should be expected
+ tracker.update("leader-1", 105, 101);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertEquals(true, status.isInitialSyncDone());
+ MessageCollectorActor.clearMessages(listener);
+
+ // If a subsequent message is received and if the difference between the followers commit index and
+ // the leaders commit index is below the syncThreshold then no status notification must be issues
+ tracker.update("leader-1", 108, 101);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertNull("No status message should be received", status);
+
+ // If the follower falls behind the leader by more than the syncThreshold then the listener should
+ // receive a syncStatus notification with status = false
+ tracker.update("leader-1", 150, 101);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertNotNull("No sync status message was received", status);
+
+ assertEquals(false, status.isInitialSyncDone());
+ MessageCollectorActor.clearMessages(listener);
+
+ // If the follower is not caught up yet it should not receive any further notification
+ tracker.update("leader-1", 150, 125);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertNull("No status message should be received", status);
+
+ // Once the syncThreshold is met a new syncStatus notification should be issued
+ tracker.update("leader-1", 160, 155);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertEquals(true, status.isInitialSyncDone());
+ MessageCollectorActor.clearMessages(listener);
+
+ // When a new leader starts sending update messages a new syncStatus notification should be immediately
+ // triggered with status = false
+ tracker.update("leader-2", 160, 155);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertEquals(false, status.isInitialSyncDone());
+ MessageCollectorActor.clearMessages(listener);
+
+ // If an update is received from a new leader which is still below the minimum expected index then
+ // syncStatus should not be changed
+ tracker.update("leader-2", 160, 159);
+
+ status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+ assertNull("No status message should be received", status);
+
+ }
+
+ @Test
+ public void testConstructorActorShouldNotBeNull(){
+ try {
+ new SyncStatusTracker(null, "commit-tracker", 10);
+ fail("A NullPointerException was expected");
+ } catch(NullPointerException e){
+ assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("actor "));
+ }
+ }
+
+ @Test
+ public void testConstructorIdShouldNotBeNull(){
+ try {
+ new SyncStatusTracker(listener, null, 10);
+ fail("A NullPointerException was expected");
+ } catch(NullPointerException e){
+ assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("id "));
+ }
+ }
+
+ @Test
+ public void testConstructorSyncThresholdShouldNotBeNegative(){
+ try {
+ new SyncStatusTracker(listener, "commit-tracker", -1);
+ fail("An IllegalArgumentException was expected");
+ } catch(IllegalArgumentException e){
+ assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("syncThreshold "));
+ }
+ }
+
+}
\ No newline at end of file
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
}
- mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
mBean.setShardManager(this);
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
+ private final String memberName;
private final List<String> localShards;
private boolean syncStatus = false;
private ShardManager shardManager;
- public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+ public ShardManagerInfo(String memberName, String name, String mxBeanType, List<String> localShards) {
super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+ this.memberName = memberName;
this.localShards = localShards;
}
- public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+ public static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType,
List<String> localShards){
- ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
+ ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards);
shardManagerInfo.registerMBean();
return this.syncStatus;
}
+ @Override
+ public String getMemberName() {
+ return memberName;
+ }
+
@Override
public void switchAllLocalShardsState(String newState, long term) {
LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
*/
boolean getSyncStatus();
+ /**
+ * Get the name of of the current member
+ *
+ * @return
+ */
+ String getMemberName();
+
/**
* Switch the Raft Behavior of all the local shards to the newBehavior
*