public ExampleActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams) {
- super(id, peerAddresses, configParams);
+ super(id, peerAddresses, configParams, (short)0);
setPersistence(true);
roleChangeNotifier = createRoleChangeNotifier(id);
}
* @return true if it is ok to replicate
*/
boolean okToReplicate();
+
+ /**
+ * Returns the payload data version of the follower.
+ */
+ short getPayloadVersion();
+
+ /**
+ * Sets the payload data version of the follower.
+ */
+ void setPayloadVersion(short payloadVersion);
}
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+ private short payloadVersion = -1;
public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
.append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
return builder.toString();
}
+
+ @Override
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
+ @Override
+ public void setPayloadVersion(short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
}
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
- public RaftActor(String id, Map<String, String> peerAddresses) {
- this(id, peerAddresses, Optional.<ConfigParams>absent());
- }
-
public RaftActor(String id, Map<String, String> peerAddresses,
- Optional<ConfigParams> configParams) {
+ Optional<ConfigParams> configParams, short payloadVersion) {
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
delegatingPersistenceProvider, LOG);
+ context.setPayloadVersion(payloadVersion);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
}
@VisibleForTesting
void setTotalMemoryRetriever(Supplier<Long> retriever);
+ short getPayloadVersion();
}
private final DataPersistenceProvider persistenceProvider;
+ private short payloadVersion;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
this.LOG = logger;
}
+ void setPayloadVersion(short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
+
+ @Override
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
void setConfigParams(ConfigParams configParams) {
this.configParams = configParams;
}
}
followerLogInformation.markFollowerActive();
+ followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
prevLogIndex(followerNextIndex),
prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), super.getReplicatedToAllIndex());
+ context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
- lastIndex(), lastTerm()), actor()
+ lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
);
return this;
}
if (snapshotTracker != null) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex(), lastTerm());
+ lastIndex(), lastTerm(), context.getPayloadVersion());
if(LOG.isDebugEnabled()) {
LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
logName(), lastIndex, lastTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm()), actor());
+ lastTerm(), context.getPayloadVersion()), actor());
return this;
}
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex, lastTerm());
+ lastIndex, lastTerm(), context.getPayloadVersion());
if(LOG.isTraceEnabled()) {
LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
// index which has been replicated successfully to all followers, -1 if none
private final long replicatedToAllIndex;
- public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
+ private final short payloadVersion;
+
+ public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
+ List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.entries = entries;
this.leaderCommit = leaderCommit;
this.replicatedToAllIndex = replicatedToAllIndex;
+ this.payloadVersion = payloadVersion;
}
private void writeObject(ObjectOutputStream out) throws IOException {
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.readShort(); // version
+ in.readShort(); // raft version
in.defaultReadObject();
return replicatedToAllIndex;
}
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
- .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
- .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
- .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
+ builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
+ .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
+ .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
+ .append(payloadVersion).append("]");
return builder.toString();
}
from.getPrevLogIndex(),
from.getPrevLogTerm(),
logEntryList,
- from.getLeaderCommit(), -1);
+ from.getLeaderCommit(), -1, (short)0);
return to;
}
// responding
private final String followerId;
- public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
+ private final short payloadVersion;
+
+ public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+ short payloadVersion) {
super(term);
this.followerId = followerId;
this.success = success;
this.logLastIndex = logLastIndex;
this.logLastTerm = logLastTerm;
+ this.payloadVersion = payloadVersion;
}
@Override
return followerId;
}
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("AppendEntriesReply [term=").append(term).append(", success=").append(success)
- .append(", logLastIndex=").append(logLastIndex).append(", logLastTerm=").append(logLastTerm)
- .append(", followerId=").append(followerId).append("]");
+ builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
+ .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
+ .append(", payloadVersion=").append(payloadVersion).append("]");
return builder.toString();
}
}
public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
DataPersistenceProvider dataPersistenceProvider) {
- super(id, peerAddresses, config);
+ super(id, peerAddresses, config, (short) 0);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
private boolean snapshotCaptureInitiated;
private SnapshotManager snapshotManager;
private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
+ private short payloadVersion;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
this.persistenceProvider = persistenceProvider;
}
+ @Override
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
+ public void setPayloadVersion(short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
+
public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
@Override public void appendAndPersist(
ReplicatedLogEntry replicatedLogEntry) {
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
//fake snapshot on index 5
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
assertEquals(8, leaderActor.getReplicatedLog().size());
//fake snapshot on index 6
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
assertEquals(8, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
//fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
assertEquals(2, leaderActor.getReplicatedLog().size());
assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
(ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
new MockRaftActorContext.MockPayload("foo-6"))
);
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
assertEquals(7, followerActor.getReplicatedLog().size());
//fake snapshot on index 7
(ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
new MockRaftActorContext.MockPayload("foo-7"))
);
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0));
assertEquals(8, followerActor.getReplicatedLog().size());
assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
new MockRaftActorContext.MockPayload("foo-7"))
);
// send an additional entry 8 with leaderCommit = 7
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0));
// 7 and 8, as lastapplied is 7
assertEquals(2, followerActor.getReplicatedLog().size());
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0));
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// set the 2nd follower nextIndex to 1 which has been snapshotted
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
//reply from a slow follower does not initiate a fake snapshot
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
ByteString snapshotBytes = fromObject(Arrays.asList(
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
//reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0));
assertEquals(0, leaderActor.getReplicatedLog().size());
}
};
*/
@Test
public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
- MockRaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
+ short payloadVersion = 5;
+ context.setPayloadVersion(payloadVersion);
- // First set the receivers term to a high number (1000)
- context.getTermInformation().update(1000, "test");
+ // First set the receivers term to a high number (1000)
+ context.getTermInformation().update(1000, "test");
- AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+ AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1, (short)4);
- behavior = createBehavior(context);
+ behavior = createBehavior(context);
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
- RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
+ RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
- assertEquals("Raft state", expected.state(), raftBehavior.state());
+ assertEquals("Raft state", expected.state(), raftBehavior.state());
- // Also expect an AppendEntriesReply to be sent where success is false
+ // Also expect an AppendEntriesReply to be sent where success is false
- AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
- behaviorActor, AppendEntriesReply.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+ behaviorActor, AppendEntriesReply.class);
- assertEquals("isSuccess", false, reply.isSuccess());
+ assertEquals("isSuccess", false, reply.isSuccess());
+ assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
}
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
behavior = createBehavior(context);
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1, (short)0);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
- return new AppendEntriesReply("follower-1", 100, false, 100, 100);
+ return new AppendEntriesReply("follower-1", 100, false, 100, 100, (short)0);
}
protected RequestVote createRequestVoteWithNewerTerm() {
setupPeers(1);
candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
- Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
+ Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)0));
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
peerActors[0], AppendEntriesReply.class);
private RaftActorBehavior follower;
+ private final short payloadVersion = 5;
+
@Override
@After
public void tearDown() throws Exception {
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef){
- return new MockRaftActorContext("follower", getSystem(), actorRef);
+ MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+ context.setPayloadVersion(payloadVersion );
+ return context;
}
@Test
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+ appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
- appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+ appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+ appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
- appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+ appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+ AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
follower = createBehavior(context);
// before the new behavior was created (1 in this case)
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
- AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+ AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0);
follower = createBehavior(context);
// before the new behavior was created (1 in this case)
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
- AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
follower = createBehavior(context);
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(newReplicatedLogEntry(1, 4, "four"));
- AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
follower = createBehavior(context);
follower = createBehavior(context);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
assertEquals("Next index", 2, log.last().getIndex() + 1);
assertEquals("Entry 1", entries.get(0), log.get(1));
entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
leaderActor.underlyingActor().clear();
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
assertEquals("Next index", 3, log.last().getIndex() + 1);
assertEquals("Entry 1", entries.get(0), log.get(1));
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(newReplicatedLogEntry(1, 4, "four"));
- AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
follower = createBehavior(context);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+ assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
}
private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
// in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1, (short)0));
assertEquals("Raft state", RaftState.Leader, behavior.state());
behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
assertEquals("Raft state", RaftState.Leader, behavior.state());
}
// in a 5 member cluster, atleast 2 followers need to be active and return a reply
RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
assertEquals("Raft state", RaftState.IsolatedLeader, behavior.state());
behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
assertEquals("Raft state", RaftState.Leader, behavior.state());
behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
- isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1, (short)0 ));
assertEquals("Raft state", RaftState.Leader, behavior.state());
}
// bowing itself to another leader in the cluster
RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
- isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+ isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1, (short)0));
assertEquals("Raft state", RaftState.Follower, behavior.state());
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ short payloadVersion = (short)5;
+ actorContext.setPayloadVersion(payloadVersion);
long term = 1;
actorContext.getTermInformation().update(term, "");
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
// The follower would normally reply - simulate that explicitly here.
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex - 1, term));
+ FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
assertEquals("Entries size", 1, appendEntries.getEntries().size());
assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
for(int i=0;i<3;i++) {
sendReplicate(actorContext, lastIndex+i+1);
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
leader = new Leader(leaderActorContext);
// Send initial heartbeat reply with last index.
- leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+ leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+ short payloadVersion = 5;
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
ApplyState applyState = applyStateList.get(0);
assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals(payloadVersion, followerInfo.getPayloadVersion());
}
@Test
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
for(int i=1;i<6;i++) {
// Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
- RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
assertTrue(newBehavior == leader);
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for AppendEntriesReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class AppendEntriesReplyTest {
+
+ @Test
+ public void testSerialization() {
+ AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+ AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+ assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+ assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+ assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+ assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+ assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+ }
+}
*/
package org.opendaylight.controller.cluster.raft.messages;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
/**
* Unit tests for AppendEntries.
*
ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
- AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
+ short payloadVersion = 5;
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+ -1, payloadVersion);
AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
@Test
public void testToAndFromSerializable() {
AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
- Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
+ Collections.<ReplicatedLogEntry>emptyList(), 10L, -1, (short)0);
assertSame("toSerializable", entries, entries.toSerializable());
assertSame("fromSerializable", entries,
@Test
public void testToAndFromLegacySerializable() {
ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1, (short)0);
Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
+ assertEquals("getPayloadVersion", expected.getPayloadVersion(), actual.getPayloadVersion());
assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+ DataStoreVersions.CURRENT_VERSION);
this.name = name.toString();
this.datastoreContext = datastoreContext;
entries.add(new ReplicatedLogImplEntry(0, 1, payload));
- assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1, (short)0).toSerializable());
}
}
});
AppendEntries appendEntries =
- new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
+ new AppendEntries(1, "member-1", 0, 100, entries, 1, -1, (short)0);
AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
}
public static AppendEntries keyValueAppendEntries() {
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1, (short)0);
}
}
import akka.japi.Creator;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
if (!ignore) {
LOG.info("{} - Randomizing delay : {}", followerId, delay);
Thread.sleep(delay);
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
} else {
- sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
+ sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), self());
}
}