Merge "Make idle timeout configurable in ssh proxy server"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index e68628dbf5c1fc992afb30ae986fff8ed8f6eef1..88f818f0faedf76f0349ce1f7294dee37b9d79d1 100644 (file)
@@ -24,6 +24,9 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -90,22 +93,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Collection<String> knownModules = new HashSet<>(128);
 
+    private final DataPersistenceProvider dataPersistenceProvider;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
-    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+    protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
             DatastoreContext datastoreContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
+        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        //createLocalShards(null);
+        createLocalShards();
+    }
+
+    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+        return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
     }
 
     public static Props props(final String type,
@@ -123,8 +133,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
-            findPrimary(
-                FindPrimary.fromSerializable(message));
+            findPrimary(FindPrimary.fromSerializable(message));
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -160,49 +169,76 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markShardAsInitialized(shardId.getShardName());
     }
 
-    @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+    private void markShardAsInitialized(String shardName) {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
-            shardInformation.setShardInitialized(true);
+            shardInformation.setActorInitialized();
         }
     }
 
-    @Override protected void handleRecover(Object message) throws Exception {
+    @Override
+    protected void handleRecover(Object message) throws Exception {
+        if(dataPersistenceProvider.isRecoveryApplicable()) {
+            if (message instanceof SchemaContextModules) {
+                SchemaContextModules msg = (SchemaContextModules) message;
+                knownModules.clear();
+                knownModules.addAll(msg.getModules());
+            } else if (message instanceof RecoveryFailure) {
+                RecoveryFailure failure = (RecoveryFailure) message;
+                LOG.error(failure.cause(), "Recovery failed");
+            } else if (message instanceof RecoveryCompleted) {
+                LOG.info("Recovery complete : {}", persistenceId());
+
+                // Delete all the messages from the akka journal except the last one
+                deleteMessages(lastSequenceNr() - 1);
+            }
+        } else {
+            if (message instanceof RecoveryCompleted) {
+                LOG.info("Recovery complete : {}", persistenceId());
 
-        if(message instanceof SchemaContextModules){
-            SchemaContextModules msg = (SchemaContextModules) message;
-            knownModules.clear();
-            knownModules.addAll(msg.getModules());
-        } else if(message instanceof RecoveryFailure){
-            RecoveryFailure failure = (RecoveryFailure) message;
-            LOG.error(failure.cause(), "Recovery failed");
-        } else if(message instanceof RecoveryCompleted){
-            LOG.info("Recovery complete : {}", persistenceId());
-
-            // Delete all the messages from the akka journal except the last one
-            deleteMessages(lastSequenceNr() - 1);
+                // Delete all the messages from the akka journal
+                deleteMessages(lastSequenceNr());
+            }
         }
     }
 
     private void findLocalShard(FindLocalShard message) {
-        ShardInformation shardInformation = localShards.get(message.getShardName());
+        final ShardInformation shardInformation = localShards.get(message.getShardName());
 
         if(shardInformation == null){
             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
             return;
         }
 
-        sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+            @Override
+            public Object get() {
+                return new LocalShardFound(shardInformation.getActor());
+            }
+        });
     }
 
-    private void sendResponse(ShardInformation shardInformation, Object message) {
+    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+            final Supplier<Object> messageSupplier) {
         if (!shardInformation.isShardInitialized()) {
-            getSender().tell(new ActorNotInitialized(), getSelf());
+            if(waitUntilInitialized) {
+                final ActorRef sender = getSender();
+                final ActorRef self = self();
+                shardInformation.addRunnableOnInitialized(new Runnable() {
+                    @Override
+                    public void run() {
+                        sender.tell(messageSupplier.get(), self);
+                    }
+                });
+            } else {
+                getSender().tell(new ActorNotInitialized(), getSelf());
+            }
+
             return;
         }
 
-        getSender().tell(message, getSelf());
+        getSender().tell(messageSupplier.get(), getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -244,14 +280,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             knownModules.clear();
             knownModules.addAll(newModules);
 
-            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
-                @Override public void apply(SchemaContextModules param) throws Exception {
+                @Override
+                public void apply(SchemaContextModules param) throws Exception {
                     LOG.info("Sending new SchemaContext to Shards");
-                    if (localShards.size() == 0) {
-                        createLocalShards(schemaContext);
-                    } else {
-                        for (ShardInformation info : localShards.values()) {
+                    for (ShardInformation info : localShards.values()) {
+                        if (info.getActor() == null) {
+                            info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
+                                            info.getPeerAddresses(), datastoreContext, schemaContext),
+                                    info.getShardId().toString()));
+                        } else {
                             info.getActor().tell(message, getSelf());
                         }
                     }
@@ -265,14 +304,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
-        final ActorRef sender = getSender();
         String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
-        ShardInformation info = localShards.get(shardName);
+        final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            ActorPath shardPath = info.getActorPath();
-            sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+                @Override
+                public Object get() {
+                    return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+                }
+            });
+
             return;
         }
 
@@ -331,7 +374,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * runs
      *
      */
-    private void createLocalShards(SchemaContext schemaContext) {
+    private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -340,11 +383,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
-            ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
-                    shardId.toString());
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
@@ -398,64 +438,98 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
-    @Override public String persistenceId() {
+    @Override
+    public String persistenceId() {
         return "shard-manager-" + type;
     }
 
-    @VisibleForTesting public Collection<String> getKnownModules() {
+    @VisibleForTesting
+    Collection<String> getKnownModules() {
         return knownModules;
     }
 
+    @VisibleForTesting
+    DataPersistenceProvider getDataPersistenceProvider() {
+        return dataPersistenceProvider;
+    }
+
     private class ShardInformation {
+        private final ShardIdentifier shardId;
         private final String shardName;
-        private final ActorRef actor;
-        private final ActorPath actorPath;
+        private ActorRef actor;
+        private ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
-        private boolean shardInitialized = false; //flag that determines if the actor is ready for business
 
-        private ShardInformation(String shardName, ActorRef actor,
-            Map<ShardIdentifier, String> peerAddresses) {
+        // flag that determines if the actor is ready for business
+        private boolean actorInitialized = false;
+
+        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+
+        private ShardInformation(String shardName, ShardIdentifier shardId,
+                Map<ShardIdentifier, String> peerAddresses) {
             this.shardName = shardName;
-            this.actor = actor;
-            this.actorPath = actor.path();
+            this.shardId = shardId;
             this.peerAddresses = peerAddresses;
         }
 
-        public String getShardName() {
+        String getShardName() {
             return shardName;
         }
 
-        public ActorRef getActor(){
+        ActorRef getActor(){
             return actor;
         }
 
-        public ActorPath getActorPath() {
+        ActorPath getActorPath() {
             return actorPath;
         }
 
-        public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+        void setActor(ActorRef actor) {
+            this.actor = actor;
+            this.actorPath = actor.path();
+        }
+
+        ShardIdentifier getShardId() {
+            return shardId;
+        }
+
+        Map<ShardIdentifier, String> getPeerAddresses() {
+            return peerAddresses;
+        }
+
+        void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        "Sending PeerAddressResolved for peer {} with address {} to {}",
-                        peerId, peerAddress, actor.path());
-                }
-                actor
-                    .tell(new PeerAddressResolved(peerId, peerAddress),
-                        getSelf());
 
+                if(actor != null) {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                                peerId, peerAddress, actor.path());
+                    }
+
+                    actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+                }
             }
         }
 
-        public boolean isShardInitialized() {
-            return shardInitialized;
+        boolean isShardInitialized() {
+            return getActor() != null && actorInitialized;
+        }
+
+        void setActorInitialized() {
+            this.actorInitialized = true;
+
+            for(Runnable runnable: runnablesOnInitialized) {
+                runnable.run();
+            }
+
+            runnablesOnInitialized.clear();
         }
 
-        public void setShardInitialized(boolean shardInitialized) {
-            this.shardInitialized = shardInitialized;
+        void addRunnableOnInitialized(Runnable runnable) {
+            runnablesOnInitialized.add(runnable);
         }
     }
 
@@ -482,6 +556,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
+        private static final long serialVersionUID = 1L;
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){