Add getPeerIds to RaftActorContext 18/28718/6
authorTom Pantelis <tpanteli@brocade.com>
Fri, 23 Oct 2015 02:50:20 +0000 (22:50 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Oct 2015 14:32:13 +0000 (14:32 +0000)
For upcoming to work to add voting status to the peer info in
RaftActorContext, I added a getPeerIds method to replace calls to
getPeerAddresses as virtually all callers really just want the IDs or want
to check the size. getPeerAddresses will (likely) be removed altogether -
this is a preliminary patch.

Change-Id: I2b6f2c36dfec14ccd4bbfef35e67ed86cf3e3e45
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index aa2b26d..3ce8364 100644 (file)
@@ -79,10 +79,10 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
-                        getRaftActorContext().getPeerAddresses().keySet(), followers);
+                        getRaftActorContext().getPeerIds(), followers);
                 } else {
                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
-                        getRaftActorContext().getPeerAddresses().keySet());
+                        getRaftActorContext().getPeerIds());
                 }
 
 
index c5d81c1..2dd3165 100644 (file)
@@ -19,7 +19,6 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -289,7 +288,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
-                .peerAddresses(new HashMap<>(context.getPeerAddresses()));
+                .peerAddresses(context.getPeerAddresses());
 
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
         if (lastLogEntry != null) {
index c418982..0c9d698 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import java.util.Collection;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
@@ -103,7 +104,7 @@ public interface RaftActorContext {
     Logger getLogger();
 
     /**
-     * @return a mapping of peerId's to their addresses
+     * @return a copy of the mapping of peerId's to their addresses
      *
      */
     Map<String, String> getPeerAddresses();
@@ -118,6 +119,11 @@ public interface RaftActorContext {
      */
     String getPeerAddress(String peerId);
 
+    /**
+     * @return the list of peer IDs.
+     */
+    Collection<String> getPeerIds();
+
     /**
      * Add to actor peers
      *
index f0236a1..cebf7b3 100644 (file)
@@ -13,10 +13,15 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
@@ -121,15 +126,18 @@ public class RaftActorContextImpl implements RaftActorContext {
         return lastApplied;
     }
 
-    @Override public void setLastApplied(long lastApplied) {
+    @Override
+    public void setLastApplied(long lastApplied) {
         this.lastApplied = lastApplied;
     }
 
-    @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+    @Override
+    public void setReplicatedLog(ReplicatedLog replicatedLog) {
         this.replicatedLog = replicatedLog;
     }
 
-    @Override public ReplicatedLog getReplicatedLog() {
+    @Override
+    public ReplicatedLog getReplicatedLog() {
         return replicatedLog;
     }
 
@@ -141,8 +149,14 @@ public class RaftActorContextImpl implements RaftActorContext {
         return this.LOG;
     }
 
-    @Override public Map<String, String> getPeerAddresses() {
-        return peerAddresses;
+    @Override
+    public Map<String, String> getPeerAddresses() {
+        return new HashMap<String, String>(peerAddresses);
+    }
+
+    @Override
+    public Collection<String> getPeerIds() {
+        return peerAddresses.keySet();
     }
 
     @Override public String getPeerAddress(String peerId) {
@@ -203,7 +217,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     @Override
     public boolean hasFollowers() {
-        return getPeerAddresses().keySet().size() > 0;
+        return getPeerIds().size() > 0;
     }
 
     @Override
index a00c024..7c8888b 100644 (file)
@@ -172,7 +172,7 @@ class RaftActorServerConfigurationSupport {
         }
 
         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
-            List <String> newConfig = new ArrayList<String>(raftContext.getPeerAddresses().keySet());
+            List <String> newConfig = new ArrayList<String>(raftContext.getPeerIds());
             newConfig.add(raftContext.getId());
 
             LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
index 8e0d2f8..26d8c0a 100644 (file)
@@ -108,7 +108,7 @@ public class SnapshotManager implements SnapshotState {
     }
 
     private boolean hasFollowers(){
-        return context.getPeerAddresses().keySet().size() > 0;
+        return context.hasFollowers();
     }
 
     private String persistenceId(){
index d4612db..b80a1ba 100644 (file)
@@ -97,7 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         setLeaderPayloadVersion(context.getPayloadVersion());
 
-        for (String followerId : context.getPeerAddresses().keySet()) {
+        for (String followerId : context.getPeerIds()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId, -1, context);
 
index b20671f..8cb011f 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
@@ -494,7 +493,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
-        Map<String, String> currentPeers = new HashMap<>(context.getPeerAddresses());
+        Map<String, String> currentPeers = context.getPeerAddresses();
         for(String peerId: serverConfig.getNewServerConfig()) {
             if(!getId().equals(peerId)) {
                 if(!currentPeers.containsKey(peerId)) {
index a59a020..8baf0d8 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import java.util.Set;
+import java.util.Collection;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -44,12 +44,12 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final int votesRequired;
 
-    private final Set<String> peers;
+    private final Collection<String> peers;
 
     public Candidate(RaftActorContext context) {
         super(context, RaftState.Candidate);
 
-        peers = context.getPeerAddresses().keySet();
+        peers = context.getPeerIds();
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
@@ -59,7 +59,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         startNewTerm();
 
-        if(context.getPeerAddresses().isEmpty()){
+        if(peers.isEmpty()){
             actor().tell(ELECTION_TIMEOUT, actor());
         } else {
             scheduleElection(electionDuration());
index 8f41147..c695282 100644 (file)
@@ -50,7 +50,7 @@ public class Follower extends AbstractRaftActorBehavior {
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(context.getRaftPolicy().automaticElectionsEnabled()) {
-            if (context.getPeerAddresses().isEmpty()) {
+            if (context.getPeerIds().isEmpty()) {
                 actor().tell(ELECTION_TIMEOUT, actor());
             } else {
                 scheduleElection(electionDuration());
index 1938dab..2a1a94c 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -166,6 +167,11 @@ public class MockRaftActorContext implements RaftActorContext {
         return peerAddresses;
     }
 
+    @Override
+    public Collection<String> getPeerIds() {
+        return peerAddresses.keySet();
+    }
+
     @Override public String getPeerAddress(String peerId) {
         return peerAddresses.get(peerId);
     }
@@ -224,7 +230,7 @@ public class MockRaftActorContext implements RaftActorContext {
 
     @Override
     public boolean hasFollowers() {
-        return getPeerAddresses().keySet().size() > 0;
+        return getPeerIds().size() > 0;
     }
 
     @Override
index 2528c8a..1b717f1 100644 (file)
@@ -161,11 +161,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify new server config was applied in both followers
 
-        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
-                followerActorContext.getPeerAddresses().keySet());
+        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), followerActorContext.getPeerIds());
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds());
 
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         expectFirstMatching(followerActor, ApplyState.class);
@@ -219,8 +217,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify new server config was applied in the new follower
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
     }
 
     @Test
@@ -259,8 +256,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify new server config was applied in the new follower
 
-        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
-                newFollowerActorContext.getPeerAddresses().keySet());
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
 
         MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
 
@@ -336,7 +332,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
 
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
-               newFollowerActorContext.getPeerAddresses().keySet());
+               newFollowerActorContext.getPeerIds());
     }
 
     @Test
@@ -362,7 +358,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
 
-        assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
         assertEquals("Leader followers size", 0,
                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
     }
index f1e7d3f..ef1a926 100644 (file)
@@ -25,9 +25,7 @@ import akka.actor.ActorRef;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
-import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
-import java.util.HashMap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -75,7 +73,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void setUp(){
         MockitoAnnotations.initMocks(this);
 
-        doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+        doReturn(false).when(mockRaftActorContext).hasFollowers();
         doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
         doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
@@ -231,7 +229,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
 
-        doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses();
+        doReturn(true).when(mockRaftActorContext).hasFollowers();
 
         doReturn(8L).when(mockRaftActorContext).getLastApplied();
 
index 99606e7..2833018 100644 (file)
@@ -426,8 +426,8 @@ public class ShardTest extends AbstractShardTest {
                             newDatastoreContext(), SCHEMA_CONTEXT);
                 }
 
-                Map<String, String> getPeerAddresses() {
-                    return getRaftActorContext().getPeerAddresses();
+                String getPeerAddress(String id) {
+                    return getRaftActorContext().getPeerAddress(id);
                 }
 
                 @Override
@@ -448,15 +448,14 @@ public class ShardTest extends AbstractShardTest {
                         }
                     })), "testPeerAddressResolved");
 
-            //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
-            assertEquals("getPeerAddresses", address,
-                ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+            assertEquals("getPeerAddress", address,
+                ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};