Changed key actors to use bounded mailbox
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 6162a0327ca6ab229be75b4f8b8c2c994dcb253a..186f2cff41351a97f9d1852396a149047bd345cc 100644 (file)
@@ -17,7 +17,9 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -30,6 +32,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayList;
@@ -68,15 +72,19 @@ public class ShardManager extends AbstractUntypedActor {
 
     private ShardManagerInfoMBean mBean;
 
+    private final ShardContext shardContext;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
-    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+            ShardContext shardContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+        this.shardContext = shardContext;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -88,22 +96,16 @@ public class ShardManager extends AbstractUntypedActor {
 
     public static Props props(final String type,
         final ClusterWrapper cluster,
-        final Configuration configuration) {
+        final Configuration configuration,
+        final ShardContext shardContext) {
 
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new Creator<ShardManager>() {
-
-            @Override
-            public ShardManager create() throws Exception {
-                return new ShardManager(type, cluster, configuration);
-            }
-        });
+        return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
     }
 
-
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -243,8 +245,9 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses),
-                    shardId.toString());
+                .actorOf(Shard.props(shardId, peerAddresses, shardContext).
+                    withMailbox(ActorContext.MAILBOX), shardId.toString());
+
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
@@ -283,10 +286,17 @@ public class ShardManager extends AbstractUntypedActor {
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
+
         return new OneForOneStrategy(10, Duration.create("1 minute"),
             new Function<Throwable, SupervisorStrategy.Directive>() {
                 @Override
                 public SupervisorStrategy.Directive apply(Throwable t) {
+                    StringBuilder sb = new StringBuilder();
+                    for(StackTraceElement element : t.getStackTrace()) {
+                       sb.append("\n\tat ")
+                         .append(element.toString());
+                    }
+                    LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
                     return SupervisorStrategy.resume();
                 }
             }
@@ -337,6 +347,28 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
     }
+
+    private static class ShardManagerCreator implements Creator<ShardManager> {
+        private static final long serialVersionUID = 1L;
+
+        final String type;
+        final ClusterWrapper cluster;
+        final Configuration configuration;
+        final ShardContext shardContext;
+
+        ShardManagerCreator(String type, ClusterWrapper cluster,
+                Configuration configuration, ShardContext shardContext) {
+            this.type = type;
+            this.cluster = cluster;
+            this.configuration = configuration;
+            this.shardContext = shardContext;
+        }
+
+        @Override
+        public ShardManager create() throws Exception {
+            return new ShardManager(type, cluster, configuration, shardContext);
+        }
+    }
 }