Remove peer address cache in ShardInformation 60/27760/4
authorTom Pantelis <tpanteli@brocade.com>
Thu, 1 Oct 2015 13:27:40 +0000 (09:27 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 9 Oct 2015 14:45:16 +0000 (14:45 +0000)
The ShardManager caches the peer addresses in the ShardInformation and
uses it mainly to suppress PeerUp, PeerDown and PeerAddressResolved
messages to the shard for peers that don't have replicas for the shard.

This is fine with static config but With the upcoming work to dynamically
add replicas, the shard will take ownership of persisting its peers so
the ShardManager will not know about dynamic peers.

I changed the semantics of the peer addresses to initial peer addresses.
They are now only used to pass to the Shard on creation. As a result,
PeerUp, PeerDown and PeerAddressResolved messages are now always sent to
the Shard for all peers. The Shard/RaftActor decide ll whether or not to
process the peer message. I changed RaftActorContextImpl#setPeerAddress
to ignore a peerId it doesn't know about instead of throwing an ex.

The other usages of the peerAddresses were to lookup the leader address.
This is now done dynamically via the ShardPeerAddressResolver.

Change-Id: Ida9738916a4a85d23198e7c095d5c73f17e2aa6c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java

index 74f02b5..f0236a1 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import static com.google.common.base.Preconditions.checkState;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -176,11 +175,12 @@ public class RaftActorContextImpl implements RaftActorContext {
         return null;
     }
 
-    @Override public void setPeerAddress(String peerId, String peerAddress) {
-        LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
-        checkState(peerAddresses.containsKey(peerId), peerId + " is unknown");
-
-        peerAddresses.put(peerId, peerAddress);
+    @Override
+    public void setPeerAddress(String peerId, String peerAddress) {
+        if(peerAddresses.containsKey(peerId)) {
+            LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+            peerAddresses.put(peerId, peerAddress);
+        }
     }
 
     @Override
index 26fdf8f..5275a30 100644 (file)
@@ -65,4 +65,19 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         verify(mockResolver, never()).resolve(anyString());
     }
+
+    @Test
+    public void testSetPeerAddress() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
+                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+                Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
+                new NonPersistentDataProvider(), log);
+
+        context.setPeerAddress("peer1", "peerAddress1_1");
+        assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
+
+        context.setPeerAddress("peer2", "peerAddress2");
+        assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
+    }
 }
index 24d808c..b48215d 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
@@ -230,7 +231,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator());
+                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
             localShards.put(info.getShardName(), info);
 
             mBean.addLocalShard(shardId.toString());
@@ -499,7 +500,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        peerAddressResolver.addPeerAddress(memberName, message.member().address());
+        addPeerAddress(memberName, message.member().address());
+
+        checkReady();
+    }
+
+    private void addPeerAddress(String memberName, Address address) {
+        peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
@@ -508,14 +515,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             info.peerUp(memberName, peerId, getSelf());
         }
-
-        checkReady();
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         String memberName = message.member().roles().head();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
+        addPeerAddress(memberName, message.member().address());
+
         markMemberAvailable(memberName);
     }
 
@@ -678,7 +685,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator));
+                    shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
@@ -739,7 +746,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
 
@@ -755,19 +762,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         private final DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
+        private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> peerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator) {
+                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
-            this.peerAddresses = peerAddresses;
+            this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
             this.shardPropsCreator = shardPropsCreator;
+            this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
+            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -799,37 +808,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
-        Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                    peerAddress);
-            if(peerAddresses.containsKey(peerId)){
-                peerAddresses.put(peerId, peerAddress);
-
-                if(actor != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                                peerId, peerAddress, actor.path());
-                    }
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+            if(actor != null) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                            peerId, peerAddress, actor.path());
                 }
 
-                notifyOnShardInitializedCallbacks();
+                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
             }
+
+            notifyOnShardInitializedCallbacks();
         }
 
         void peerDown(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerDown(memberName, peerId), sender);
             }
         }
 
         void peerUp(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerUp(memberName, peerId), sender);
             }
         }
@@ -840,7 +841,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         boolean isShardReadyWithLeaderId() {
             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || peerAddresses.get(leaderId) != null);
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -855,7 +856,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(isLeader()) {
                 return Serialization.serializedActorPath(getActor());
             } else {
-                return peerAddresses.get(leaderId);
+                return addressResolver.resolve(leaderId);
             }
         }
 
index 12fabbb..464fc7f 100644 (file)
@@ -80,6 +80,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver {
 
     @Override
     public String resolve(String peerId) {
+        if(peerId == null) {
+            return null;
+        }
+
         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
         return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
     }