From: kalaiselvik Date: Thu, 12 Nov 2015 08:48:39 +0000 (+0530) Subject: Bug 2187: Persisting Actor peerIds' in snapshot X-Git-Tag: release/beryllium~164 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cc2566a2570ce97c5ec054fca77f380ca35b942d Bug 2187: Persisting Actor peerIds' in snapshot Persisting Raft Actor's peer information in a snapshot and recovering the same from the snapshot. Incorporated the comments. Change-Id: I12831f129b2bdeb1c64f473e94be617f8d6ee487 Signed-off-by: kalaiselvik --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java index ca09823a12..1b9ca21010 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java @@ -45,7 +45,8 @@ class GetSnapshotReplyActor extends UntypedActor { params.captureSnapshot.getUnAppliedEntries(), params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(), params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(), - params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor()); + params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor(), + params.peerInformation); LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot); @@ -63,9 +64,9 @@ class GetSnapshotReplyActor extends UntypedActor { } public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor, - Duration receiveTimeout, String id) { + Duration receiveTimeout, String id, ServerConfigurationPayload updatedPeerInfo) { return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor, - receiveTimeout, id)); + receiveTimeout, id, updatedPeerInfo)); } private static final class Params { @@ -74,14 +75,16 @@ class GetSnapshotReplyActor extends UntypedActor { final ElectionTerm electionTerm; final Duration receiveTimeout; final String id; + final ServerConfigurationPayload peerInformation; Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor, - Duration receiveTimeout, String id) { + Duration receiveTimeout, String id, ServerConfigurationPayload peerInfo) { this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot); this.electionTerm = Preconditions.checkNotNull(electionTerm); this.replyToActor = Preconditions.checkNotNull(replyToActor); this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout); this.id = Preconditions.checkNotNull(id); + this.peerInformation = peerInfo; } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index aa49e16813..b20e9daa94 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -15,6 +15,7 @@ import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.util.Collection; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -218,4 +219,21 @@ public interface RaftActorContext { * @return an implementation of the RaftPolicy so that the Raft code can be adapted */ RaftPolicy getRaftPolicy(); + + /** + * @return true if there are any dynamic server configuration changes available, + * false if static peer configurations are still in use + */ + boolean isDynamicServerConfigurationInUse(); + + /** + * Configures the dynamic server configurations are avaialble for the RaftActor + */ + void setDynamicServerConfigurationInUse(); + + /** + * @return the RaftActor's peer information as a ServerConfigurationPayload if the + * dynamic server configurations are available, otherwise returns null + */ + @Nullable ServerConfigurationPayload getPeerServerInfo(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 257d307f68..66059b5d62 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -16,9 +16,11 @@ import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.opendaylight.controller.cluster.DataPersistenceProvider; @@ -48,6 +50,8 @@ public class RaftActorContextImpl implements RaftActorContext { private ConfigParams configParams; + private boolean dynamicServerConfiguration = false; + @VisibleForTesting private Supplier totalMemoryRetriever; @@ -201,6 +205,7 @@ public class RaftActorContextImpl implements RaftActorContext { for(String peerIdToRemove: currentPeers) { this.removePeer(peerIdToRemove); } + setDynamicServerConfigurationInUse(); } @Override public ConfigParams getConfigParams() { @@ -266,4 +271,29 @@ public class RaftActorContextImpl implements RaftActorContext { public RaftPolicy getRaftPolicy() { return configParams.getRaftPolicy(); } + + @Override + public boolean isDynamicServerConfigurationInUse() { + return dynamicServerConfiguration; + } + + @Override + public void setDynamicServerConfigurationInUse() { + this.dynamicServerConfiguration = true; + } + + @Override + public ServerConfigurationPayload getPeerServerInfo() { + if (!isDynamicServerConfigurationInUse()) { + return null; + } + Collection peers = getPeers(); + List newConfig = new ArrayList<>(peers.size() + 1); + for(PeerInfo peer: peers) { + newConfig.add(new ServerInfo(peer.getId(), peer.isVoting())); + } + + newConfig.add(new ServerInfo(getId(), true)); + return (new ServerConfigurationPayload(newConfig)); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 0a37ef7a46..3ac1e79674 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -177,6 +177,10 @@ class RaftActorRecoverySupport { // Apply the snapshot to the actors state cohort.applyRecoverySnapshot(snapshot.getState()); + if (snapshot.getServerConfiguration() != null) { + context.updatePeerIds(snapshot.getServerConfiguration()); + } + timer.stop(); log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}", context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 258287a36a..39f49931a1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -11,15 +11,11 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; @@ -183,17 +179,9 @@ class RaftActorServerConfigurationSupport { } protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ - Collection peers = raftContext.getPeers(); - List newConfig = new ArrayList<>(peers.size() + 1); - for(PeerInfo peer: peers) { - newConfig.add(new ServerInfo(peer.getId(), peer.isVoting())); - } - - newConfig.add(new ServerInfo(raftContext.getId(), true)); - - LOG.debug("{}: Persisting new server configuration : {}", raftContext.getId(), newConfig); - - ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig); + raftContext.setDynamicServerConfigurationInUse(); + ServerConfigurationPayload payload = raftContext.getPeerServerInfo(); + LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index ec46f30878..39548dc6d5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -125,12 +125,13 @@ class RaftActorSnapshotMessageSupport { ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, - snapshotReplyActorTimeout, context.getId())); + snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo())); cohort.createSnapshot(snapshotReplyActor); } else { Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, - context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor()); + context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), + context.getPeerServerInfo()); sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)), context.getActor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java index a369c25ddf..3d1976483a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -22,9 +22,11 @@ public class Snapshot implements Serializable { private final long lastAppliedTerm; private final long electionTerm; private final String electionVotedFor; + private final ServerConfigurationPayload serverConfig; private Snapshot(byte[] state, List unAppliedEntries, long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) { + long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor, + ServerConfigurationPayload serverConfig) { this.state = state; this.unAppliedEntries = unAppliedEntries; this.lastIndex = lastIndex; @@ -33,17 +35,25 @@ public class Snapshot implements Serializable { this.lastAppliedTerm = lastAppliedTerm; this.electionTerm = electionTerm; this.electionVotedFor = electionVotedFor; + this.serverConfig = serverConfig; } public static Snapshot create(byte[] state, List entries, long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { - return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null); + return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null, null); } public static Snapshot create(byte[] state, List entries, long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) { return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, - electionTerm, electionVotedFor); + electionTerm, electionVotedFor, null); + } + + public static Snapshot create(byte[] state, List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor, + ServerConfigurationPayload serverConfig) { + return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, + electionTerm, electionVotedFor, serverConfig); } public byte[] getState() { @@ -79,11 +89,15 @@ public class Snapshot implements Serializable { return electionVotedFor; } + public ServerConfigurationPayload getServerConfiguration() { + return serverConfig; + } + @Override public String toString() { return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size() + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor - + "]"; + + ", ServerConfigPayload=" + serverConfig + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 0d0c910298..4a20e5b3ae 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -303,7 +303,7 @@ public class SnapshotManager implements SnapshotState { captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(), context.getTermInformation().getCurrentTerm(), - context.getTermInformation().getVotedFor()); + context.getTermInformation().getVotedFor(), context.getPeerServerInfo()); context.getPersistenceProvider().saveSnapshot(snapshot); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 321b2dd864..3b1e69d0ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -365,7 +365,8 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedIndex(), installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), - context.getTermInformation().getVotedFor()); + context.getTermInformation().getVotedFor(), + context.getPeerServerInfo()); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index ddc8bed42a..71ca4cae5d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; @@ -213,6 +215,7 @@ public class RaftActorRecoverySupportTest { assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex()); assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm()); assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); + assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); verify(mockCohort).applyRecoverySnapshot(snapshotBytes); } @@ -395,6 +398,7 @@ public class RaftActorRecoverySupportTest { sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj)); //verify new peers + assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3), Sets.newHashSet(context.getPeerIds())); assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting()); @@ -415,6 +419,7 @@ public class RaftActorRecoverySupportTest { sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj)); //verify new peers + assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds())); } @@ -431,4 +436,29 @@ public class RaftActorRecoverySupportTest { //verify new peers assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds())); } + + @Test + public void testOnSnapshotOfferWithServerConfiguration() { + long electionTerm = 2; + String electionVotedFor = "member-2"; + ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(localId, true), + new ServerInfo("follower1", true), + new ServerInfo("follower2", true))); + + Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.emptyList(), + -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload); + + SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345); + SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot); + + sendMessageToSupport(snapshotOffer); + + assertEquals("Journal log size", 0, context.getReplicatedLog().size()); + assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm()); + assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor()); + assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse()); + assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"), + Sets.newHashSet(context.getPeerIds())); + } }