votingMember = false;
}
+ LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
+
setDynamicServerConfigurationInUse();
}
}
private void onApplySnapshot(ApplySnapshot message) {
- log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
- message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
+ log.info("{}: Applying snapshot on follower: {}", context.getId(), message.getSnapshot());
context.getSnapshotManager().apply(message);
}
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
+ if(snapshot.getServerConfiguration() != null) {
+ context.updatePeerIds(snapshot.getServerConfiguration());
+ }
+
if(snapshot.getState().length > 0 ) {
applySnapshotProcedure.accept(snapshot.getState());
}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
// followerId to the followerToSnapshot map.
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
snapshot.get().getLastIncludedIndex(),
snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
+ nextChunkIndex,
followerToSnapshot.getTotalChunks(),
- Optional.of(followerToSnapshot.getLastChunkHashCode())
+ Optional.of(followerToSnapshot.getLastChunkHashCode()),
+ serverConfig
).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor(),
- context.getPeerServerInfo(true));
+ installSnapshot.getServerConfig().orNull());
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
private int chunkIndex;
private int totalChunks;
private Optional<Integer> lastChunkHashCode;
+ private Optional<ServerConfigurationPayload> serverConfig;
/**
* Empty constructor to satisfy Externalizable.
public InstallSnapshot() {
}
- public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data,
+ int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode, Optional<ServerConfigurationPayload> serverConfig) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
this.chunkIndex = chunkIndex;
this.totalChunks = totalChunks;
this.lastChunkHashCode = lastChunkHashCode;
+ this.serverConfig = serverConfig;
}
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) {
- this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+ this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks,
+ Optional.<Integer>absent(), Optional.<ServerConfigurationPayload>absent());
}
public String getLeaderId() {
return lastChunkHashCode;
}
+ public Optional<ServerConfigurationPayload> getServerConfig() {
+ return serverConfig;
+ }
+
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeShort(RaftVersions.CURRENT_VERSION);
out.writeInt(lastChunkHashCode.get().intValue());
}
+ out.writeByte(serverConfig.isPresent() ? 1 : 0);
+ if(serverConfig.isPresent()) {
+ out.writeObject(serverConfig.get());
+ }
+
out.writeObject(data);
}
lastChunkHashCode = Optional.of(in.readInt());
}
+ serverConfig = Optional.absent();
+ boolean serverConfigPresent = in.readByte() == 1;
+ if(serverConfigPresent) {
+ serverConfig = Optional.of((ServerConfigurationPayload)in.readObject());
+ }
+
data = (byte[])in.readObject();
}
public String toString() {
return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex="
+ lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length
- + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]";
+ + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode
+ + ", serverConfig=" + serverConfig.orNull() + "]";
}
public static InstallSnapshot fromSerializable (Object o) {
InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
from.getLeaderId(), from.getLastIncludedIndex(),
from.getLastIncludedTerm(), from.getData().toByteArray(),
- from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode,
+ Optional.<ServerConfigurationPayload>absent());
return installSnapshot;
}
return;
}
+ if(message instanceof ServerConfigurationPayload) {
+ super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload)message);
+ return;
+ }
+
if(message instanceof SetPeerAddress) {
setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
((SetPeerAddress) message).getPeerAddress());
}
protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
- String expId, long expTerm, long expIndex, MockPayload payload) {
+ String expId, long expTerm, long expIndex, Payload payload) {
assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
final Identifier id = expId == null ? null : new MockIdentifier(expId);
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
expSnapshotState.add(payload5);
expSnapshotState.add(payload6);
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+ MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+ // Send a server config change to test that the install snapshot includes the server config.
+
+ ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(leaderId, true),
+ new ServerInfo(follower1Id, false),
+ new ServerInfo(follower2Id, false)));
+ leaderContext.updatePeerIds(serverConfig);
+ ((AbstractLeader)leader).updateMinReplicaCount();
+ leaderActor.tell(serverConfig, ActorRef.noSender());
+
+ applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+ applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+ verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
expSnapshotState.add(payload7);
- verifyInstallSnapshotToLaggingFollower(7);
+ verifyInstallSnapshotToLaggingFollower(8, serverConfig);
testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
}
expSnapshotState.add(payload2);
- verifyInstallSnapshotToLaggingFollower(2L);
+ verifyInstallSnapshotToLaggingFollower(2L, null);
// Sends a payload with index 3.
verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
*
* @throws Exception
*/
- private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
+ private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
+ @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
List<Snapshot> persistedSnapshots;
List<ReplicatedLogEntry> unAppliedEntry;
ApplySnapshot applySnapshot;
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
+ if(expServerConfig != null) {
+ Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+ assertEquals("Leader snapshot server config", expServerInfo,
+ new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+
+ assertEquals("Follower 2 snapshot server config", expServerInfo,
+ new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+
+ ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
+ assertNotNull("Follower 2 server config is null", follower2ServerConfig);
+
+ assertEquals("Follower 2 server config", expServerInfo,
+ new HashSet<>(follower2ServerConfig.getServerConfig()));
+ }
+
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
import static org.junit.Assert.assertEquals;
import com.google.common.base.Optional;
import java.io.Serializable;
+import java.util.Arrays;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
/**
}
}
- InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+ ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("leader", true), new ServerInfo("follower", false)));
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6,
+ Optional.<Integer>of(54321), Optional.of(serverConfig));
Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION);
assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass());
@Test
public void testSerializationWithPreBoronVersion() {
byte[] data = {0,1,2,3,4,5,7,8,9};
- InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321),
+ Optional.<ServerConfigurationPayload>absent());
Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION);
assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass());
assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
assertArrayEquals("getData", expected.getData(), actual.getData());
+
assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
actual.getLastChunkHashCode().isPresent());
if(expected.getLastChunkHashCode().isPresent()) {
assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
actual.getLastChunkHashCode().get());
}
+
+ assertEquals("getServerConfig present", expected.getServerConfig().isPresent(),
+ actual.getServerConfig().isPresent());
+ if(expected.getServerConfig().isPresent()) {
+ assertEquals("getServerConfig", expected.getServerConfig().get().getServerConfig(),
+ actual.getServerConfig().get().getServerConfig());
+ }
}
}