Merge "BUG 2676 : Use custom client-dispatcher when configured"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Feb 2015 00:46:52 +0000 (00:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Feb 2015 00:46:53 +0000 (00:46 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf [new file with mode: 0644]

index 06f3afc57cb19d13dfd75448ce59dcf1a1e6bf39..3b30107e048e9fb513c80d03ed26fc3fd14eade7 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -25,7 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import scala.concurrent.Future;
 
 /**
@@ -109,7 +109,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                     doRegistration(shard, path, scope);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
@@ -131,7 +131,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                             reply.getListenerRegistrationPath()));
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
index c51ea80726e54d9cf656e193ca8521d242206c76..4445b14e2edc1b48b5c92012e8a659275357f433 100644 (file)
@@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private Future<Void> buildCohortList() {
 
         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
             @Override
@@ -83,7 +83,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -111,7 +111,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     finishCanCommit(returnFuture);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
 
         return returnFuture;
     }
@@ -158,7 +158,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 returnFuture.set(Boolean.valueOf(result));
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
@@ -170,7 +170,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
-        return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+        return Futures.sequence(futureList, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -239,7 +239,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                                 propagateException, returnFuture, callback);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return returnFuture;
@@ -304,7 +304,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     callback.success();
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @VisibleForTesting
index 530a36cff657304005ebd4b43a5bb1e46449951b..03d1b3a6d736541dca754e49fa6f35ea67b7ed2b 100644 (file)
@@ -60,7 +60,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
@@ -105,7 +105,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         futureList.add(replyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         // Transform the combined Future into a Future that returns the cohort actor path from
         // the ReadyTransactionReply. That's the end result of the ready operation.
@@ -152,7 +152,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                             serializedReadyReply.getClass()));
                 }
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -198,7 +198,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
 
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
@@ -216,7 +216,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
 
     }
@@ -255,7 +255,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> readFuture = executeOperationAsync(new ReadData(path));
 
-        readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -280,7 +280,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<Object> notUsed)
@@ -297,7 +297,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
     }
 
@@ -332,6 +332,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> future = executeOperationAsync(new DataExists(path));
 
-        future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 }
index 5bc53442aeff04aa43c299a49890b3ecfe70b974..d63ec8010dc714adc71c951bf7fd0a65937a4ce7 100644 (file)
@@ -484,7 +484,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return txFutureCallback;
@@ -601,7 +601,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
 
-            createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+            createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
 
         @Override
@@ -621,7 +621,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                                 public void run() {
                                     tryCreateTransaction();
                                 }
-                            }, actorContext.getActorSystem().dispatcher());
+                            }, actorContext.getClientDispatcher());
                     return;
                 }
             }
index cb06c898fd1940743c19b1763f2b173ec3dacbfc..507d2389b1547a0b7f861e881516fd04d62cad35 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -93,6 +94,7 @@ public class ActorContext {
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private final Timeout transactionCommitOperationTimeout;
+    private final Dispatchers dispatchers;
 
     private volatile SchemaContext schemaContext;
 
@@ -111,6 +113,7 @@ public class ActorContext {
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
         this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+        this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
@@ -127,6 +130,7 @@ public class ActorContext {
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
         jmxReporter.start();
+
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -200,7 +204,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindPrimary returned unkown response: %s", response));
             }
-        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
     /**
@@ -251,7 +255,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindLocalShard returned unkown response: %s", response));
             }
-        }, getActorSystem().dispatcher());
+        }, getClientDispatcher());
     }
 
     private String findPrimaryPathOrNull(String shardName) {
@@ -514,5 +518,13 @@ public class ActorContext {
         return transactionCommitOperationTimeout;
     }
 
+    /**
+     * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+     * code on the datastore
+     * @return
+     */
+    public ExecutionContext getClientDispatcher() {
+        return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+    }
 
 }
index 58aec30a8470035bfd7111c5b4c5badebf26b401..8ad2e899aab5b5fa8ad7bf3e4a74c3ca66210964 100644 (file)
@@ -193,10 +193,12 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
             ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
                     MoreExecutors.sameThreadExecutor());
-            doReturn(executor).when(mockActorSystem).dispatcher();
+
 
             ActorContext actorContext = mock(ActorContext.class);
 
+            doReturn(executor).when(actorContext).getClientDispatcher();
+
             String shardName = "shard-1";
             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
                     shardName, actorContext, mockListener);
@@ -227,6 +229,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                     shardName, actorContext, mockListener);
 
             doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+            doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
             doReturn(getSystem()).when(actorContext).getActorSystem();
             doReturn(getSystem().actorSelection(getRef().path())).
                     when(actorContext).actorSelection(getRef().path());
index b013515f2595950cba669ac08dfdb1a8eefe37fa..0a2a0d1bc0595a8932f2e2ee3ab27c6b8f422855 100644 (file)
@@ -66,6 +66,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
         doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
index 7ce41a4db1304bd84cbdf2b7e02ce0fd4363e828..fa2f9187d6059f1585a1475531556d8563db3c5a 100644 (file)
@@ -129,6 +129,7 @@ public class TransactionProxyTest {
         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
index eae46da2eee53bd4b2cf5ee7d2cb823e0111b6be..3c6a0cef5c605fbb23e3c9501676f67d95805e9d 100644 (file)
@@ -1,17 +1,20 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
@@ -299,4 +302,41 @@ public class ActorContextTest extends AbstractActorTest{
 
         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
     }
+
+    @Test
+    public void testClientDispatcherIsGlobalDispatcher(){
+
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+    }
+
+    @Test
+    public void testClientDispatcherIsNotGlobalDispatcher(){
+
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+        ActorContext actorContext =
+                new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+        actorSystem.shutdown();
+
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf
new file mode 100644 (file)
index 0000000..32c55a6
--- /dev/null
@@ -0,0 +1,116 @@
+akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
+    loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
+    actor {
+         serializers {
+                  java = "akka.serialization.JavaSerializer"
+                  proto = "akka.remote.serialization.ProtobufSerializer"
+         }
+
+        serialization-bindings {
+            "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+            "com.google.protobuf.Message" = proto
+
+        }
+    }
+}
+
+in-memory-journal {
+    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
+in-memory-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
+bounded-mailbox {
+  mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+  mailbox-capacity = 1000
+  mailbox-push-timeout-time = 100ms
+}
+
+client-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+transaction-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+shard-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+notification-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
\ No newline at end of file