Remove peer address cache in ShardInformation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 99d66d515f5b77e7afc36b570a23491878c59df0..b48215d3617c92937e0d73fb6cbacf56c3c50c96 100644 (file)
@@ -39,11 +39,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
@@ -55,6 +55,8 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardF
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
@@ -87,11 +89,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
-    // Stores a mapping between a member name and the address of the member
-    // Member names look like "member-1", "member-2" etc and are as specified
-    // in configuration
-    private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
@@ -101,8 +98,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
-    private final String shardManagerIdentifierString;
-
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -117,6 +112,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
+    private final ShardPeerAddressResolver peerAddressResolver;
+
     private SchemaContext schemaContext;
 
     /**
@@ -129,12 +126,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
         this.type = datastoreContext.getDataStoreType();
-        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         this.primaryShardInfoCache = primaryShardInfoCache;
 
+        peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
+        this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
+                peerAddressResolver).build();
+
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
@@ -142,11 +142,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     public static Props props(
-        final ClusterWrapper cluster,
-        final Configuration configuration,
-        final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch,
-        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            final ClusterWrapper cluster,
+            final Configuration configuration,
+            final DatastoreContext datastoreContext,
+            final CountDownLatch waitTillReadyCountdownLatch,
+            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -176,6 +176,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited){
+            memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
@@ -205,24 +207,32 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
-            if(localShards.containsKey(createShard.getShardName())) {
+            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+            if(localShards.containsKey(moduleShardConfig.getShardName())) {
                 throw new IllegalStateException(String.format("Shard with name %s already exists",
-                        createShard.getShardName()));
+                        moduleShardConfig.getShardName()));
             }
 
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName());
-            Map<String, String> peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames());
+            configuration.addModuleShardConfiguration(moduleShardConfig);
 
-            LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses);
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+                    moduleShardConfig.getShardMemberNames()*/);
+
+            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                    moduleShardConfig.getShardMemberNames(), peerAddresses);
 
             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
             if(shardDatastoreContext == null) {
                 shardDatastoreContext = datastoreContext;
+            } else {
+                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                        peerAddressResolver).build();
             }
 
-            ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator());
-            localShards.put(createShard.getShardName(), info);
+            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
+                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+            localShards.put(info.getShardName(), info);
 
             mBean.addLocalShard(shardId.toString());
 
@@ -464,7 +474,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.remove(message.member().roles().head());
+        peerAddressResolver.removePeerAddress(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
+
+    private void memberExited(ClusterEvent.MemberExited message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        peerAddressResolver.removePeerAddress(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
@@ -473,21 +500,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.put(memberName, message.member().address());
+        addPeerAddress(memberName, message.member().address());
+
+        checkReady();
+    }
+
+    private void addPeerAddress(String memberName, Address address) {
+        peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
-                getShardActorPath(shardName, memberName), getSelf());
-        }
+            String peerId = getShardIdentifier(memberName, shardName).toString();
+            info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
 
-        checkReady();
+            info.peerUp(memberName, peerId, getSelf());
+        }
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         String memberName = message.member().roles().head();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
+        addPeerAddress(memberName, message.member().address());
+
         markMemberAvailable(memberName);
     }
 
@@ -507,6 +542,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 primaryShardInfoCache.remove(info.getShardName());
             }
+
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
@@ -517,11 +554,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
+
+            info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
     private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = context;
+        datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
+                peerAddressResolver).build();
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
                 info.getActor().tell(datastoreContext, getSelf());
@@ -571,7 +611,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(info.newProps(schemaContext)
-                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
@@ -591,28 +631,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                    }
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                            }
 
-                    return found;
+                            return found;
                 }
             });
 
             return;
         }
 
-        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
-            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
-                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
+        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                    shardName, address);
 
-                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                        shardName, path);
-
-                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
-                        message.isWaitUntilReady()), getContext());
-                return;
-            }
+            getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
+                    message.isWaitUntilReady()), getContext());
+            return;
         }
 
         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
@@ -621,23 +657,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
-    private StringBuilder getShardManagerActorPathBuilder(Address address) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
-        return builder;
-    }
-
-    private String getShardActorPath(String shardName, String memberName) {
-        Address address = memberNameToAddress.get(memberName);
-        if(address != null) {
-            StringBuilder builder = getShardManagerActorPathBuilder(address);
-            builder.append("/")
-                .append(getShardIdentifier(memberName, shardName));
-            return builder.toString();
-        }
-        return null;
-    }
-
     /**
      * Construct the name of the shard actor given the name of the member on
      * which the shard resides and the name of the shard
@@ -647,7 +666,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @return
      */
     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
-        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+        return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
@@ -657,8 +676,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      */
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
-        List<String> memberShardNames =
-            this.configuration.getMemberShardNames(memberName);
+        Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
         List<String> localShardActorNames = new ArrayList<>();
@@ -667,11 +685,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator));
+                    shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+                datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
 
         mBean.setShardManager(this);
     }
@@ -680,14 +698,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * Given the name of the shard find the addresses of all it's peers
      *
      * @param shardName
-     * @return
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName));
-    }
-
-    private Map<String, String> getPeerAddresses(String shardName, Collection<String> members) {
-
+        Collection<String> members = configuration.getMembersFromShardName(shardName);
         Map<String, String> peerAddresses = new HashMap<>();
 
         String currentMemberName = this.cluster.getCurrentMemberName();
@@ -695,8 +708,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for(String memberName : members) {
             if(!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-                String path = getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardId.toString(), path);
+                String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
+                peerAddresses.put(shardId.toString(), address);
             }
         }
         return peerAddresses;
@@ -706,14 +719,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-            new Function<Throwable, SupervisorStrategy.Directive>() {
-                @Override
-                public SupervisorStrategy.Directive apply(Throwable t) {
-                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                    return SupervisorStrategy.resume();
-                }
+                new Function<Throwable, SupervisorStrategy.Directive>() {
+            @Override
+            public SupervisorStrategy.Directive apply(Throwable t) {
+                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                return SupervisorStrategy.resume();
             }
-        );
+        }
+                );
 
     }
 
@@ -733,7 +746,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
 
@@ -749,19 +762,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         private final DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
+        private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> peerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator) {
+                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
-            this.peerAddresses = peerAddresses;
+            this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
             this.shardPropsCreator = shardPropsCreator;
+            this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
+            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -793,26 +808,30 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
-        Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                peerAddress);
-            if(peerAddresses.containsKey(peerId)){
-                peerAddresses.put(peerId, peerAddress);
-
-                if(actor != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                                peerId, peerAddress, actor.path());
-                    }
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
-                    actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
+            if(actor != null) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                            peerId, peerAddress, actor.path());
                 }
 
-                notifyOnShardInitializedCallbacks();
+                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+            }
+
+            notifyOnShardInitializedCallbacks();
+        }
+
+        void peerDown(String memberName, String peerId, ActorRef sender) {
+            if(actor != null) {
+                actor.tell(new PeerDown(memberName, peerId), sender);
+            }
+        }
+
+        void peerUp(String memberName, String peerId, ActorRef sender) {
+            if(actor != null) {
+                actor.tell(new PeerUp(memberName, peerId), sender);
             }
         }
 
@@ -822,7 +841,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         boolean isShardReadyWithLeaderId() {
             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || peerAddresses.get(leaderId) != null);
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -837,7 +856,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(isLeader()) {
                 return Serialization.serializedActorPath(getActor());
             } else {
-                return peerAddresses.get(leaderId);
+                return addressResolver.resolve(leaderId);
             }
         }