<properties>
- <akka.version>2.3.4</akka.version>
+ <akka.version>2.3.9</akka.version>
<appauth.version>0.5.0-SNAPSHOT</appauth.version>
<archetype-app-northbound>0.1.0-SNAPSHOT</archetype-app-northbound>
<arphandler.version>0.6.0-SNAPSHOT</arphandler.version>
* The interval in which the leader needs to check itself if its isolated
* @return FiniteDuration
*/
- FiniteDuration getIsolatedCheckInterval();
+ long getIsolatedCheckIntervalInMillis();
/**
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
- private FiniteDuration isolatedLeaderCheckInterval =
- new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
// 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
// in-memory journal can use before it needs to snapshot
}
public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
- this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+ this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
}
public void setElectionTimeoutFactor(long electionTimeoutFactor){
}
@Override
- public FiniteDuration getIsolatedCheckInterval() {
+ public long getIsolatedCheckIntervalInMillis() {
return isolatedLeaderCheckInterval;
}
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
+
+ private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
+ long elapsedTime = (System.nanoTime() - applyState.getStartTime());
+ if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
dataSizeSinceLastSnapshot = 0;
- LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
+ LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+ " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
package org.opendaylight.controller.cluster.raft.base.messages;
import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-
import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
public class ApplyState implements Serializable {
private static final long serialVersionUID = 1L;
private final ActorRef clientActor;
private final String identifier;
private final ReplicatedLogEntry replicatedLogEntry;
+ private final long startTime;
public ApplyState(ActorRef clientActor, String identifier,
ReplicatedLogEntry replicatedLogEntry) {
this.clientActor = clientActor;
this.identifier = identifier;
this.replicatedLogEntry = replicatedLogEntry;
+ this.startTime = System.nanoTime();
}
public ActorRef getClientActor() {
public ReplicatedLogEntry getReplicatedLogEntry() {
return replicatedLogEntry;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public String toString() {
+ return "ApplyState{" +
+ "identifier='" + identifier + '\'' +
+ ", replicatedLogEntry.index =" + replicatedLogEntry.getIndex() +
+ ", startTime=" + startTime +
+ '}';
+ }
}
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (ยง5.2)
sendAppendEntries(0, false);
+
+ // It is important to schedule this heartbeat here
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
/**
return this;
}
+ if(followerLogInformation.timeSinceLastActivity() >
+ context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+ LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " +
+ "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+ logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+ context.getLastApplied(), context.getCommitIndex());
+ }
+
followerLogInformation.markFollowerActive();
if (appendEntriesReply.isSuccess()) {
return this;
}
+ protected void beforeSendHeartbeat(){}
+
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
}
}
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
+ if (message instanceof SendHeartBeat) {
+ beforeSendHeartbeat();
+ sendHeartBeat();
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ return this;
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply((InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+
return super.handleMessage(sender, message);
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
-import scala.concurrent.duration.FiniteDuration;
/**
* The behavior of a RaftActor when it is in the Leader state
* set commitIndex = N (ยง5.3, ยง5.4).
*/
public class Leader extends AbstractLeader {
- private Cancellable installSnapshotSchedule = null;
- private Cancellable isolatedLeaderCheckSchedule = null;
+ private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+ private final Stopwatch isolatedLeaderCheck;
public Leader(RaftActorContext context) {
super(context);
-
- scheduleIsolatedLeaderCheck(
- new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
- context.getConfigParams().getHeartBeatInterval().unit()));
+ isolatedLeaderCheck = Stopwatch.createStarted();
}
@Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
context.getId(), minIsolatedLeaderPeerCount, leaderId);
+
return switchBehavior(new IsolatedLeader(context));
}
}
return super.handleMessage(sender, originalMessage);
}
- protected void stopIsolatedLeaderCheckSchedule() {
- if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
- isolatedLeaderCheckSchedule.cancel();
+ @Override
+ protected void beforeSendHeartbeat(){
+ if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+ isolatedLeaderCheck.reset().start();
}
- }
- protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
- isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
- context.getActor(), new IsolatedLeaderCheck(),
- context.getActorSystem().dispatcher(), context.getActor());
}
@Override
public void close() throws Exception {
- stopIsolatedLeaderCheckSchedule();
super.close();
}
assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
}};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
String persistenceId = factory.generateActorId("notifier-");
factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- // sleeping for a minimum of 2 seconds, if it spans more its fine.
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ List<RoleChanged> matches = null;
+ for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ if(matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
- List<RoleChanged> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
assertEquals(3, matches.size());
// check if the notifier got a role change from null to Follower
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
leaderActor.getRaftActorContext().setCommitIndex(4);
leaderActor.getRaftActorContext().setLastApplied(4);
leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(leaderId, leaderActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{
+
+ /**
+ * When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where
+ * if no follower responded to an initial AppendEntries heartbeats would not be sent to it. This test verifies
+ * that regardless of whether followers respond or not we schedule heartbeats.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries() throws Exception {
+ logStart("testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ List<SendHeartBeat> allMessages = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ // Need more than 1 heartbeat to be delivered because we waited for 1 second with heartbeat interval 200ms
+ assertTrue(String.format("%s messages is less than expected", allMessages.size()),
+ allMessages.size() > 1);
+
+ }};
+ }
+
+}
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest {
+public class IsolatedLeaderTest extends AbstractLeaderTest {
private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
@Override
protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
- return new Leader(actorContext);
+ return new IsolatedLeader(actorContext);
}
@Override
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-public class LeaderTest extends AbstractRaftActorBehaviorTest {
+public class LeaderTest extends AbstractLeaderTest {
static final String FOLLOWER_ID = "follower";
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext actorContext = createActorContext();
- actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+ actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
followerActor.path().toString()).build());
return actorContext;
}
return context;
}
- public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- AbstractRaftActorBehavior behavior;
-
- @Override public void onReceive(Object message) throws Exception {
- if(behavior != null) {
- behavior.handleMessage(sender(), message);
- }
-
- super.onReceive(message);
- }
-
- public static Props props() {
- return Props.create(ForwardMessageToBehaviorActor.class);
- }
- }
-
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
- leaderActor.underlyingActor().behavior = leader;
+ leaderActor.underlyingActor().setBehavior(follower);
leader.handleMessage(followerActor, appendEntriesReply);
leaderActor.underlyingActor().clear();
leaderActorContext.setPeerAddresses(peerAddresses);
leader = new Leader(leaderActorContext);
- leader.stopIsolatedLeaderCheckSchedule();
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
followerActorContext.setConfigParams(configParams);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
leaderActorContext.getReplicatedLog().removeFrom(0);
leaderActorContext.setCommitIndex(-1);
follower.close();
}
+ @Test
+ public void testLaggingFollowerStarvation() throws Exception {
+ logStart("testLaggingFollowerStarvation");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ for(int i=1;i<6;i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
+
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
+
+ }};
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.actor.Props;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+ private RaftActorBehavior behavior;
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if(behavior != null) {
+ behavior.handleMessage(sender(), message);
+ }
+
+ super.onReceive(message);
+ }
+
+ public static Props props() {
+ return Props.create(ForwardMessageToBehaviorActor.class);
+ }
+
+ public void setBehavior(RaftActorBehavior behavior){
+ this.behavior = behavior;
+ }
+}
+
package org.opendaylight.controller.cluster.raft.utils;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.util.Timeout;
throw new TimeoutException("Actor not ready in time.");
}
+
+ public static Props props() {
+ return Props.create(MessageCollectorActor.class);
+ }
}
loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
-
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
- serialization-bindings {
- "com.google.protobuf.Message" = proto
+ default-dispatcher {
+ # Setting throughput to 1 makes the dispatcher fair. It processes 1 message from
+ # the mailbox before moving on to the next mailbox
+ throughput = 1
+ }
- }
+ default-mailbox {
+ # When not using a BalancingDispatcher it is recommended that we use the SingleConsumerOnlyUnboundedMailbox
+ # as it is the most efficient for multiple producer/single consumer use cases
+ mailbox-type="akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
+ }
}
remote {
log-remote-lifecycle-events = off
timerContext.stop();
Snapshot timerSnapshot = commitTimer.getSnapshot();
- double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+ double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
long commitTimeoutInSeconds = actorContext.getDatastoreContext()
.getShardTransactionCommitTimeoutInSeconds();
assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
- assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
doReturn(commitTimerContext).when(commitTimer).time();
doReturn(commitSnapshot).when(commitTimer).getSnapshot();
- doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
doReturn(10.0).when(actorContext).getTxCreationLimit();
}
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
private long lastMessageIndex = -1;
private long lastMessageSize = 0;
+ private Stopwatch appendEntriesWatch;
public DummyShard(Configuration configuration, String followerId) {
this.configuration = configuration;
}
protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
-
LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+ if(appendEntriesWatch != null){
+ long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
+ if(elapsed >= 5){
+ LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
+ ", leaderCommit = {}, prevLogIndex = {}, size = {}",
+ elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
+ }
+ appendEntriesWatch.reset().start();
+ } else {
+ appendEntriesWatch = Stopwatch.createStarted();
+ }
+
if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
}
org.slf4j.simpleLogger.logFile=System.out
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.defaultLogLevel=info
\ No newline at end of file
+org.slf4j.simpleLogger.defaultLogLevel=error
\ No newline at end of file