params.captureSnapshot.getUnAppliedEntries(),
params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
- params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor());
+ params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor(),
+ params.peerInformation);
LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
}
public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
- Duration receiveTimeout, String id) {
+ Duration receiveTimeout, String id, ServerConfigurationPayload updatedPeerInfo) {
return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor,
- receiveTimeout, id));
+ receiveTimeout, id, updatedPeerInfo));
}
private static final class Params {
final ElectionTerm electionTerm;
final Duration receiveTimeout;
final String id;
+ final ServerConfigurationPayload peerInformation;
Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
- Duration receiveTimeout, String id) {
+ Duration receiveTimeout, String id, ServerConfigurationPayload peerInfo) {
this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot);
this.electionTerm = Preconditions.checkNotNull(electionTerm);
this.replyToActor = Preconditions.checkNotNull(replyToActor);
this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout);
this.id = Preconditions.checkNotNull(id);
+ this.peerInformation = peerInfo;
}
}
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import java.util.Collection;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
* @return an implementation of the RaftPolicy so that the Raft code can be adapted
*/
RaftPolicy getRaftPolicy();
+
+ /**
+ * @return true if there are any dynamic server configuration changes available,
+ * false if static peer configurations are still in use
+ */
+ boolean isDynamicServerConfigurationInUse();
+
+ /**
+ * Configures the dynamic server configurations are avaialble for the RaftActor
+ */
+ void setDynamicServerConfigurationInUse();
+
+ /**
+ * @return the RaftActor's peer information as a ServerConfigurationPayload if the
+ * dynamic server configurations are available, otherwise returns null
+ */
+ @Nullable ServerConfigurationPayload getPeerServerInfo();
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
private ConfigParams configParams;
+ private boolean dynamicServerConfiguration = false;
+
@VisibleForTesting
private Supplier<Long> totalMemoryRetriever;
for(String peerIdToRemove: currentPeers) {
this.removePeer(peerIdToRemove);
}
+ setDynamicServerConfigurationInUse();
}
@Override public ConfigParams getConfigParams() {
public RaftPolicy getRaftPolicy() {
return configParams.getRaftPolicy();
}
+
+ @Override
+ public boolean isDynamicServerConfigurationInUse() {
+ return dynamicServerConfiguration;
+ }
+
+ @Override
+ public void setDynamicServerConfigurationInUse() {
+ this.dynamicServerConfiguration = true;
+ }
+
+ @Override
+ public ServerConfigurationPayload getPeerServerInfo() {
+ if (!isDynamicServerConfigurationInUse()) {
+ return null;
+ }
+ Collection<PeerInfo> peers = getPeers();
+ List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
+ for(PeerInfo peer: peers) {
+ newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
+ }
+
+ newConfig.add(new ServerInfo(getId(), true));
+ return (new ServerConfigurationPayload(newConfig));
+ }
}
// Apply the snapshot to the actors state
cohort.applyRecoverySnapshot(snapshot.getState());
+ if (snapshot.getServerConfiguration() != null) {
+ context.updatePeerIds(snapshot.getServerConfiguration());
+ }
+
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
}
protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
- Collection<PeerInfo> peers = raftContext.getPeers();
- List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
- for(PeerInfo peer: peers) {
- newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
- }
-
- newConfig.add(new ServerInfo(raftContext.getId(), true));
-
- LOG.debug("{}: Persisting new server configuration : {}", raftContext.getId(), newConfig);
-
- ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig);
+ raftContext.setDynamicServerConfigurationInUse();
+ ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
+ LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
- snapshotReplyActorTimeout, context.getId()));
+ snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo()));
cohort.createSnapshot(snapshotReplyActor);
} else {
Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
- context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor());
+ context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo());
sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
context.getActor());
private final long lastAppliedTerm;
private final long electionTerm;
private final String electionVotedFor;
+ private final ServerConfigurationPayload serverConfig;
private Snapshot(byte[] state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
+ long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+ ServerConfigurationPayload serverConfig) {
this.state = state;
this.unAppliedEntries = unAppliedEntries;
this.lastIndex = lastIndex;
this.lastAppliedTerm = lastAppliedTerm;
this.electionTerm = electionTerm;
this.electionVotedFor = electionVotedFor;
+ this.serverConfig = serverConfig;
}
public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
long lastAppliedIndex, long lastAppliedTerm) {
- return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null);
+ return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null, null);
}
public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
- electionTerm, electionVotedFor);
+ electionTerm, electionVotedFor, null);
+ }
+
+ public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+ ServerConfigurationPayload serverConfig) {
+ return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+ electionTerm, electionVotedFor, serverConfig);
}
public byte[] getState() {
return electionVotedFor;
}
+ public ServerConfigurationPayload getServerConfiguration() {
+ return serverConfig;
+ }
+
@Override
public String toString() {
return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+ ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
+ ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor
- + "]";
+ + ", ServerConfigPayload=" + serverConfig + "]";
}
}
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
context.getTermInformation().getCurrentTerm(),
- context.getTermInformation().getVotedFor());
+ context.getTermInformation().getVotedFor(), context.getPeerServerInfo());
context.getPersistenceProvider().saveSnapshot(snapshot);
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
- context.getTermInformation().getVotedFor());
+ context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo());
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+ assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
}
sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
//verify new peers
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
Sets.newHashSet(context.getPeerIds()));
assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
//verify new peers
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
}
//verify new peers
assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
}
+
+ @Test
+ public void testOnSnapshotOfferWithServerConfiguration() {
+ long electionTerm = 2;
+ String electionVotedFor = "member-2";
+ ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(localId, true),
+ new ServerInfo("follower1", true),
+ new ServerInfo("follower2", true)));
+
+ Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
+
+ SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+ sendMessageToSupport(snapshotOffer);
+
+ assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+ assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+ assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+ assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+ assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
+ Sets.newHashSet(context.getPeerIds()));
+ }
}