summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
c8accd2)
For upcoming to work to add voting status to the peer info in
RaftActorContext, I added a getPeerIds method to replace calls to
getPeerAddresses as virtually all callers really just want the IDs or want
to check the size. getPeerAddresses will (likely) be removed altogether -
this is a preliminary patch.
Change-Id: I2b6f2c36dfec14ccd4bbfef35e67ed86cf3e3e45
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
- getRaftActorContext().getPeerAddresses().keySet(), followers);
+ getRaftActorContext().getPeerIds(), followers);
} else {
LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
} else {
LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
- getRaftActorContext().getPeerAddresses().keySet());
+ getRaftActorContext().getPeerIds());
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(new HashMap<>(context.getPeerAddresses()));
+ .peerAddresses(context.getPeerAddresses());
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
+import java.util.Collection;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
- * @return a mapping of peerId's to their addresses
+ * @return a copy of the mapping of peerId's to their addresses
*
*/
Map<String, String> getPeerAddresses();
*
*/
Map<String, String> getPeerAddresses();
*/
String getPeerAddress(String peerId);
*/
String getPeerAddress(String peerId);
+ /**
+ * @return the list of peer IDs.
+ */
+ Collection<String> getPeerIds();
+
/**
* Add to actor peers
*
/**
* Add to actor peers
*
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.HashMap;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
- @Override public void setLastApplied(long lastApplied) {
+ @Override
+ public void setLastApplied(long lastApplied) {
this.lastApplied = lastApplied;
}
this.lastApplied = lastApplied;
}
- @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ @Override
+ public void setReplicatedLog(ReplicatedLog replicatedLog) {
this.replicatedLog = replicatedLog;
}
this.replicatedLog = replicatedLog;
}
- @Override public ReplicatedLog getReplicatedLog() {
+ @Override
+ public ReplicatedLog getReplicatedLog() {
- @Override public Map<String, String> getPeerAddresses() {
- return peerAddresses;
+ @Override
+ public Map<String, String> getPeerAddresses() {
+ return new HashMap<String, String>(peerAddresses);
+ }
+
+ @Override
+ public Collection<String> getPeerIds() {
+ return peerAddresses.keySet();
}
@Override public String getPeerAddress(String peerId) {
}
@Override public String getPeerAddress(String peerId) {
@Override
public boolean hasFollowers() {
@Override
public boolean hasFollowers() {
- return getPeerAddresses().keySet().size() > 0;
+ return getPeerIds().size() > 0;
}
protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
}
protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
- List <String> newConfig = new ArrayList<String>(raftContext.getPeerAddresses().keySet());
+ List <String> newConfig = new ArrayList<String>(raftContext.getPeerIds());
newConfig.add(raftContext.getId());
LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
newConfig.add(raftContext.getId());
LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
}
private boolean hasFollowers(){
}
private boolean hasFollowers(){
- return context.getPeerAddresses().keySet().size() > 0;
+ return context.hasFollowers();
}
private String persistenceId(){
}
private String persistenceId(){
setLeaderPayloadVersion(context.getPayloadVersion());
setLeaderPayloadVersion(context.getPayloadVersion());
- for (String followerId : context.getPeerAddresses().keySet()) {
+ for (String followerId : context.getPeerIds()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId, -1, context);
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId, -1, context);
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
-import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
}
public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
}
public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
- Map<String, String> currentPeers = new HashMap<>(context.getPeerAddresses());
+ Map<String, String> currentPeers = context.getPeerAddresses();
for(String peerId: serverConfig.getNewServerConfig()) {
if(!getId().equals(peerId)) {
if(!currentPeers.containsKey(peerId)) {
for(String peerId: serverConfig.getNewServerConfig()) {
if(!getId().equals(peerId)) {
if(!currentPeers.containsKey(peerId)) {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.Collection;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
private final int votesRequired;
private final int votesRequired;
- private final Set<String> peers;
+ private final Collection<String> peers;
public Candidate(RaftActorContext context) {
super(context, RaftState.Candidate);
public Candidate(RaftActorContext context) {
super(context, RaftState.Candidate);
- peers = context.getPeerAddresses().keySet();
+ peers = context.getPeerIds();
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
- if(context.getPeerAddresses().isEmpty()){
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(context.getRaftPolicy().automaticElectionsEnabled()) {
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(context.getRaftPolicy().automaticElectionsEnabled()) {
- if (context.getPeerAddresses().isEmpty()) {
+ if (context.getPeerIds().isEmpty()) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
import com.google.common.base.Supplier;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
import com.google.common.base.Supplier;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+ @Override
+ public Collection<String> getPeerIds() {
+ return peerAddresses.keySet();
+ }
+
@Override public String getPeerAddress(String peerId) {
return peerAddresses.get(peerId);
}
@Override public String getPeerAddress(String peerId) {
return peerAddresses.get(peerId);
}
@Override
public boolean hasFollowers() {
@Override
public boolean hasFollowers() {
- return getPeerAddresses().keySet().size() > 0;
+ return getPeerIds().size() > 0;
// Verify new server config was applied in both followers
// Verify new server config was applied in both followers
- assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
- followerActorContext.getPeerAddresses().keySet());
+ assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
- assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
- newFollowerActorContext.getPeerAddresses().keySet());
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
expectFirstMatching(followerActor, ApplyState.class);
expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
expectFirstMatching(followerActor, ApplyState.class);
// Verify new server config was applied in the new follower
// Verify new server config was applied in the new follower
- assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
- newFollowerActorContext.getPeerAddresses().keySet());
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
// Verify new server config was applied in the new follower
// Verify new server config was applied in the new follower
- assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
- newFollowerActorContext.getPeerAddresses().keySet());
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
- newFollowerActorContext.getPeerAddresses().keySet());
+ newFollowerActorContext.getPeerIds());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
- assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
assertEquals("Leader followers size", 0,
((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
}
assertEquals("Leader followers size", 0,
((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
}
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
-import com.google.common.collect.ImmutableMap;
-import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public void setUp(){
MockitoAnnotations.initMocks(this);
public void setUp(){
MockitoAnnotations.initMocks(this);
- doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(false).when(mockRaftActorContext).hasFollowers();
doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
- doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(true).when(mockRaftActorContext).hasFollowers();
doReturn(8L).when(mockRaftActorContext).getLastApplied();
doReturn(8L).when(mockRaftActorContext).getLastApplied();
newDatastoreContext(), SCHEMA_CONTEXT);
}
newDatastoreContext(), SCHEMA_CONTEXT);
}
- Map<String, String> getPeerAddresses() {
- return getRaftActorContext().getPeerAddresses();
+ String getPeerAddress(String id) {
+ return getRaftActorContext().getPeerAddress(id);
}
})), "testPeerAddressResolved");
}
})), "testPeerAddressResolved");
- //waitUntilLeader(shard);
assertEquals("Recovery complete", true,
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("Recovery complete", true,
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
- assertEquals("getPeerAddresses", address,
- ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+ assertEquals("getPeerAddress", address,
+ ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};