From: Moiz Raja Date: Sat, 21 Feb 2015 16:43:33 +0000 (-0800) Subject: BUG 2676 : Use custom client-dispatcher when configured X-Git-Tag: release/lithium~494^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a366d9bb224bc8e317dfbd74335250cf604d150b BUG 2676 : Use custom client-dispatcher when configured Use a client dispatcher for all code which originates at the client end and requires a dispatcher for completion of a Future that was created by an ask Change-Id: If2c1fa064e368152539ad21622985cc044f305d3 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 06f3afc57c..3b30107e04 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.dispatch.OnComplete; +import com.google.common.annotations.VisibleForTesting; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -25,7 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import scala.concurrent.Future; /** @@ -109,7 +109,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration doRegistration(shard, path, scope); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } private void doRegistration(ActorRef shard, final YangInstanceIdentifier path, @@ -131,7 +131,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration reply.getListenerRegistrationPath())); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c51ea80726..4445b14e2e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private Future buildCohortList() { Future> combinedFutures = Futures.sequence(cohortFutures, - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); return combinedFutures.transform(new AbstractFunction1, Void>() { @Override @@ -83,7 +83,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } return null; } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); + }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override @@ -111,7 +111,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho finishCanCommit(returnFuture); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return returnFuture; } @@ -158,7 +158,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } returnFuture.set(Boolean.valueOf(result)); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { @@ -170,7 +170,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); } - return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); + return Futures.sequence(futureList, actorContext.getClientDispatcher()); } @Override @@ -239,7 +239,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho propagateException, returnFuture, callback); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return returnFuture; @@ -304,7 +304,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho callback.success(); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } @VisibleForTesting diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 530a36cff6..03d1b3a6d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -60,7 +60,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } private Future completeOperation(Future operationFuture){ - operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher()); + operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher()); return operationFuture; } @@ -105,7 +105,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { futureList.add(replyFuture); Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); // Transform the combined Future into a Future that returns the cohort actor path from // the ReadyTransactionReply. That's the end result of the ready operation. @@ -152,7 +152,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { serializedReadyReply.getClass())); } } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); + }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override @@ -198,7 +198,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); OnComplete> onComplete = new OnComplete>() { @Override @@ -216,7 +216,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); } } @@ -255,7 +255,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future readFuture = executeOperationAsync(new ReadData(path)); - readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + readFuture.onComplete(onComplete, actorContext.getClientDispatcher()); } @Override @@ -280,7 +280,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) @@ -297,7 +297,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); } } @@ -332,6 +332,6 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future future = executeOperationAsync(new DataExists(path)); - future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + future.onComplete(onComplete, actorContext.getClientDispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5bc53442ae..d63ec8010d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -484,7 +484,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { newTxFutureCallback.setPrimaryShard(primaryShard); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return txFutureCallback; @@ -601,7 +601,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); - createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher()); + createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @Override @@ -621,7 +621,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void run() { tryCreateTransaction(); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index cb06c898fd..507d2389b1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -47,6 +47,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -93,6 +94,7 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private final Timeout transactionCommitOperationTimeout; + private final Dispatchers dispatchers; private volatile SchemaContext schemaContext; @@ -111,6 +113,7 @@ public class ActorContext { this.configuration = configuration; this.datastoreContext = datastoreContext; this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + this.dispatchers = new Dispatchers(actorSystem.dispatchers()); operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); @@ -127,6 +130,7 @@ public class ActorContext { transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); jmxReporter.start(); + } public DatastoreContext getDatastoreContext() { @@ -200,7 +204,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindPrimary returned unkown response: %s", response)); } - }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } /** @@ -251,7 +255,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindLocalShard returned unkown response: %s", response)); } - }, getActorSystem().dispatcher()); + }, getClientDispatcher()); } private String findPrimaryPathOrNull(String shardName) { @@ -514,5 +518,13 @@ public class ActorContext { return transactionCommitOperationTimeout; } + /** + * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client + * code on the datastore + * @return + */ + public ExecutionContext getClientDispatcher() { + return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 58aec30a84..8ad2e899aa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -193,10 +193,12 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( MoreExecutors.sameThreadExecutor()); - doReturn(executor).when(mockActorSystem).dispatcher(); + ActorContext actorContext = mock(ActorContext.class); + doReturn(executor).when(actorContext).getClientDispatcher(); + String shardName = "shard-1"; final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( shardName, actorContext, mockListener); @@ -227,6 +229,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { shardName, actorContext, mockListener); doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); doReturn(getSystem()).when(actorContext).getActorSystem(); doReturn(getSystem().actorSelection(getRef().path())). when(actorContext).actorSelection(getRef().path()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index b013515f25..0a2a0d1bc0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -66,6 +66,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { MockitoAnnotations.initMocks(this); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); doReturn(datastoreContext).when(actorContext).getDatastoreContext(); doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 7ce41a4db1..fa2f9187d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -129,6 +129,7 @@ public class TransactionProxyTest { DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index eae46da2ee..3c6a0cef5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,17 +1,20 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Test; @@ -299,4 +302,41 @@ public class ActorContextTest extends AbstractActorTest{ assertTrue("did not take as much time as expected", watch.getTime() > 1000); } + + @Test + public void testClientDispatcherIsGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + } + + @Test + public void testClientDispatcherIsNotGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + + ActorContext actorContext = + new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + actorSystem.shutdown(); + + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf new file mode 100644 index 0000000000..32c55a65f6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf @@ -0,0 +1,116 @@ +akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] + + actor { + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java + "com.google.protobuf.Message" = proto + + } + } +} + +in-memory-journal { + class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal" +} + +in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} + +bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms +} + +client-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +transaction-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +shard-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +notification-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} \ No newline at end of file