Remove peer address cache in ShardInformation 60/27760/4
authorTom Pantelis <tpanteli@brocade.com>
Thu, 1 Oct 2015 13:27:40 +0000 (09:27 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 9 Oct 2015 14:45:16 +0000 (14:45 +0000)
The ShardManager caches the peer addresses in the ShardInformation and
uses it mainly to suppress PeerUp, PeerDown and PeerAddressResolved
messages to the shard for peers that don't have replicas for the shard.

This is fine with static config but With the upcoming work to dynamically
add replicas, the shard will take ownership of persisting its peers so
the ShardManager will not know about dynamic peers.

I changed the semantics of the peer addresses to initial peer addresses.
They are now only used to pass to the Shard on creation. As a result,
PeerUp, PeerDown and PeerAddressResolved messages are now always sent to
the Shard for all peers. The Shard/RaftActor decide ll whether or not to
process the peer message. I changed RaftActorContextImpl#setPeerAddress
to ignore a peerId it doesn't know about instead of throwing an ex.

The other usages of the peerAddresses were to lookup the leader address.
This is now done dynamically via the ShardPeerAddressResolver.

Change-Id: Ida9738916a4a85d23198e7c095d5c73f17e2aa6c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java

index 74f02b5ef5146ed7d527f83552c298276a6d6485..f0236a1abfa9cf0c12a9e6384f76ab4dc2df1333 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import static com.google.common.base.Preconditions.checkState;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -176,11 +175,12 @@ public class RaftActorContextImpl implements RaftActorContext {
         return null;
     }
 
-    @Override public void setPeerAddress(String peerId, String peerAddress) {
-        LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
-        checkState(peerAddresses.containsKey(peerId), peerId + " is unknown");
-
-        peerAddresses.put(peerId, peerAddress);
+    @Override
+    public void setPeerAddress(String peerId, String peerAddress) {
+        if(peerAddresses.containsKey(peerId)) {
+            LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+            peerAddresses.put(peerId, peerAddress);
+        }
     }
 
     @Override
index 26fdf8f25d34434477831f0fdc4489bf2794742a..5275a303d37d7a54d73286056f8db625a8f6c5dc 100644 (file)
@@ -65,4 +65,19 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         verify(mockResolver, never()).resolve(anyString());
     }
+
+    @Test
+    public void testSetPeerAddress() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
+                "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
+                Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
+                new NonPersistentDataProvider(), log);
+
+        context.setPeerAddress("peer1", "peerAddress1_1");
+        assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
+
+        context.setPeerAddress("peer2", "peerAddress2");
+        assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
+    }
 }
index 24d808cd91bf76839bcb9bdbb08e61e49a17db9b..b48215d3617c92937e0d73fb6cbacf56c3c50c96 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
@@ -230,7 +231,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator());
+                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
             localShards.put(info.getShardName(), info);
 
             mBean.addLocalShard(shardId.toString());
@@ -499,7 +500,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        peerAddressResolver.addPeerAddress(memberName, message.member().address());
+        addPeerAddress(memberName, message.member().address());
+
+        checkReady();
+    }
+
+    private void addPeerAddress(String memberName, Address address) {
+        peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
@@ -508,14 +515,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             info.peerUp(memberName, peerId, getSelf());
         }
-
-        checkReady();
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         String memberName = message.member().roles().head();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
+        addPeerAddress(memberName, message.member().address());
+
         markMemberAvailable(memberName);
     }
 
@@ -678,7 +685,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator));
+                    shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
@@ -739,7 +746,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
 
@@ -755,19 +762,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         private final DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
+        private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> peerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator) {
+                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
-            this.peerAddresses = peerAddresses;
+            this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
             this.shardPropsCreator = shardPropsCreator;
+            this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
+            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -799,37 +808,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
-        Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                    peerAddress);
-            if(peerAddresses.containsKey(peerId)){
-                peerAddresses.put(peerId, peerAddress);
-
-                if(actor != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                                peerId, peerAddress, actor.path());
-                    }
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+            if(actor != null) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                            peerId, peerAddress, actor.path());
                 }
 
-                notifyOnShardInitializedCallbacks();
+                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
             }
+
+            notifyOnShardInitializedCallbacks();
         }
 
         void peerDown(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerDown(memberName, peerId), sender);
             }
         }
 
         void peerUp(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerUp(memberName, peerId), sender);
             }
         }
@@ -840,7 +841,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         boolean isShardReadyWithLeaderId() {
             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || peerAddresses.get(leaderId) != null);
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -855,7 +856,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(isLeader()) {
                 return Serialization.serializedActorPath(getActor());
             } else {
-                return peerAddresses.get(leaderId);
+                return addressResolver.resolve(leaderId);
             }
         }
 
index 12fabbb6d2d0dda93dbfd6f9edfef7c19f510b0a..464fc7f53a5aeaf0443ce5afe79b047c53042e8e 100644 (file)
@@ -80,6 +80,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver {
 
     @Override
     public String resolve(String peerId) {
+        if(peerId == null) {
+            return null;
+        }
+
         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
         return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
     }