Recover ServerConfigurationPayload ReplicatedLogEntry's and immediately apply to the peer map in RaftActorContext.
Review Comments incoporated.
Change-Id: I1b1b3c21e83eb5ea799dd040a4da8f78f1155082
Signed-off-by: Rajesh_Sindagi <Rajesh_Sindagi@dell.com>
*/
String getPeerAddress(String peerId);
+ /**
+ * @param serverCfgPayload
+ */
+ void updatePeerIds(ServerConfigurationPayload serverCfgPayload);
+
/**
* @return list of PeerInfo
*/
import com.google.common.base.Supplier;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.slf4j.Logger;
public class RaftActorContextImpl implements RaftActorContext {
return peerAddress;
}
+ @Override
+ public void updatePeerIds(ServerConfigurationPayload serverConfig){
+
+ Set<String> currentPeers = new HashSet<>(this.getPeerIds());
+ for(ServerInfo server: serverConfig.getServerConfig()) {
+ if(!getId().equals(server.getId())) {
+ VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
+ if(!currentPeers.contains(server.getId())) {
+ this.addToPeers(server.getId(), null, votingState);
+ } else {
+ this.getPeerInfo(server.getId()).setVotingState(votingState);
+ currentPeers.remove(server.getId());
+ }
+ }
+ }
+
+ for(String peerIdToRemove: currentPeers) {
+ this.removePeer(peerIdToRemove);
+ }
+ }
+
@Override public ConfigParams getConfigParams() {
return configParams;
}
context.getTermInformation().getVotedFor());
}
} else {
- dataRecoveredWithPersistenceDisabled = true;
+ boolean isServerConfigPayload = false;
+ if(message instanceof ReplicatedLogEntry){
+ ReplicatedLogEntry repLogEntry = (ReplicatedLogEntry)message;
+ if(isServerConfigurationPayload(repLogEntry)){
+ isServerConfigPayload = true;
+ context.updatePeerIds((ServerConfigurationPayload)repLogEntry.getData());
+ }
+ }
+
+ if(!isServerConfigPayload){
+ dataRecoveredWithPersistenceDisabled = true;
+ }
}
return recoveryComplete;
logEntry.getIndex(), logEntry.size());
}
+ if(isServerConfigurationPayload(logEntry)){
+ context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
+ }
replicatedLog().append(logEntry);
}
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- cohort.startLogRecoveryBatch(batchSize);
- }
+ if(!isServerConfigurationPayload(logEntry)){
+ if(currentRecoveryBatchCount == 0) {
+ cohort.startLogRecoveryBatch(batchSize);
+ }
- cohort.appendRecoveredLogEntry(logEntry.getData());
+ cohort.appendRecoveredLogEntry(logEntry.getData());
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
}
}
"journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
replicatedLog().getSnapshotTerm(), replicatedLog().size());
}
+
+ private boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ return (repLogEntry.getData() instanceof ServerConfigurationPayload);
+ }
}
import akka.actor.ActorRef;
import akka.actor.Cancellable;
-import java.util.HashSet;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
protected String getId(){
return context.getId();
}
-
- public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
- Set<String> currentPeers = new HashSet<>(context.getPeerIds());
- for(ServerInfo server: serverConfig.getServerConfig()) {
- if(!getId().equals(server.getId())) {
- VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
- if(!currentPeers.contains(server.getId())) {
- context.addToPeers(server.getId(), null, votingState);
- } else {
- context.getPeerInfo(server.getId()).setVotingState(votingState);
- currentPeers.remove(server.getId());
- }
- }
- }
-
- for(String peerIdToRemove: currentPeers) {
- context.removePeer(peerIdToRemove);
- }
- }
}
context.getReplicatedLog().appendAndPersist(entry);
if(entry.getData() instanceof ServerConfigurationPayload) {
- applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+ context.updatePeerIds((ServerConfigurationPayload)entry.getData());
}
}
import akka.persistence.SnapshotSelectionCriteria;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.hamcrest.Description;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private RaftActorContext context;
private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
}
+
+ @Test
+ public void testUpdatePeerIds() {
+
+ String leader = "Leader";
+ String follower1 = "follower1";
+ String follower2 = "follower2";
+ String follower3 = "follower3";
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ peerAddresses.put(leader, null);
+ peerAddresses.put(follower1, null);
+ peerAddresses.put(follower2, null);
+
+ context.addToPeers(leader,null,VotingState.VOTING);
+ context.addToPeers(follower1,null,VotingState.VOTING);
+ context.addToPeers(follower2,null,VotingState.VOTING);
+
+ assertEquals("Size", 3, context.getPeers().size());
+
+ //add new Server
+ ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo(leader, true),
+ new ServerInfo(follower1, true),
+ new ServerInfo(follower2, true),
+ new ServerInfo(follower3, true)));
+
+ MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 1, obj);
+
+ sendMessageToSupport(logEntry);
+ //verify size and names
+ assertEquals("Size", 4, context.getPeers().size());
+ assertEquals("New follower matched", true , context.getPeerIds().contains(follower3));
+
+ //remove existing follower1
+ obj = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("Leader", true),
+ new ServerInfo("follower2", true),
+ new ServerInfo("follower3", true)));
+
+ logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj);
+
+ sendMessageToSupport(logEntry);
+ //verify size and names
+ assertEquals("Size", 3, context.getPeers().size());
+ assertEquals("Removed follower matched", false, context.getPeerIds().contains(follower1));
+ }
}