import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import scala.concurrent.Future;
/**
doRegistration(shard, path, scope);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
reply.getListenerRegistrationPath()));
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@Override
private Future<Void> buildCohortList() {
Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
@Override
}
return null;
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
finishCanCommit(returnFuture);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
return returnFuture;
}
}
returnFuture.set(Boolean.valueOf(result));
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
- return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+ return Futures.sequence(futureList, actorContext.getClientDispatcher());
}
@Override
propagateException, returnFuture, callback);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
return returnFuture;
callback.success();
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@VisibleForTesting
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+ operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
return operationFuture;
}
futureList.add(replyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
serializedReadyReply.getClass()));
}
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
Lists.newArrayList(recordedOperationFutures),
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
}
};
- combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
Future<Object> readFuture = executeOperationAsync(new ReadData(path));
- readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
}
@Override
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
Lists.newArrayList(recordedOperationFutures),
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> notUsed)
}
};
- combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
Future<Object> future = executeOperationAsync(new DataExists(path));
- future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
newTxFutureCallback.setPrimaryShard(primaryShard);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
return txFutureCallback;
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable());
- createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+ createTxFuture.onComplete(this, actorContext.getClientDispatcher());
}
@Override
public void run() {
tryCreateTransaction();
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
return;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private final Timeout transactionCommitOperationTimeout;
+ private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+ this.dispatchers = new Dispatchers(actorSystem.dispatchers());
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
jmxReporter.start();
+
}
public DatastoreContext getDatastoreContext() {
throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
}
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
/**
throw new UnknownMessageException(String.format(
"FindLocalShard returned unkown response: %s", response));
}
- }, getActorSystem().dispatcher());
+ }, getClientDispatcher());
}
private String findPrimaryPathOrNull(String shardName) {
return transactionCommitOperationTimeout;
}
+ /**
+ * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+ * code on the datastore
+ * @return
+ */
+ public ExecutionContext getClientDispatcher() {
+ return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ }
}
doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
MoreExecutors.sameThreadExecutor());
- doReturn(executor).when(mockActorSystem).dispatcher();
+
ActorContext actorContext = mock(ActorContext.class);
+ doReturn(executor).when(actorContext).getClientDispatcher();
+
String shardName = "shard-1";
final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
shardName, actorContext, mockListener);
shardName, actorContext, mockListener);
doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(getSystem()).when(actorContext).getActorSystem();
doReturn(getSystem().actorSelection(getRef().path())).
when(actorContext).actorSelection(getRef().path());
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
package org.opendaylight.controller.cluster.datastore.utils;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
assertTrue("did not take as much time as expected", watch.getTime() > 1000);
}
+
+ @Test
+ public void testClientDispatcherIsGlobalDispatcher(){
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ }
+
+ @Test
+ public void testClientDispatcherIsNotGlobalDispatcher(){
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+ ActorContext actorContext =
+ new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ actorSystem.shutdown();
+
+ }
+
}
--- /dev/null
+akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ "com.google.protobuf.Message" = proto
+
+ }
+ }
+}
+
+in-memory-journal {
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
+in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
+bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+}
+
+client-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+transaction-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+shard-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+notification-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
\ No newline at end of file