<properties>
<mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
<yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
+ <configfile.directory>etc/opendaylight/karaf</configfile.directory>
</properties>
<dependencyManagement>
<dependencies>
<repository>mvn:org.opendaylight.yangtools/features-yangtools/${symbol_dollar}{yangtools.version}/xml/features</repository>
<repository>mvn:org.opendaylight.controller/features-mdsal/${symbol_dollar}{mdsal.version}/xml/features</repository>
<repository>mvn:org.opendaylight.controller/features-restconf/${symbol_dollar}{mdsal.version}/xml/features</repository>
- <feature name='odl-${artifactId}-api' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: api '>
+ <feature name='odl-${artifactId}-api' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: api'>
<feature version='${symbol_dollar}{yangtools.version}'>odl-yangtools-models</feature>
<bundle>mvn:${groupId}/${artifactId}-api/${symbol_dollar}{project.version}</bundle>
</feature>
- <feature name='odl-${artifactId}-impl' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: impl '>
+ <feature name='odl-${artifactId}' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId}'>
<feature version='${symbol_dollar}{mdsal.version}'>odl-mdsal-broker</feature>
<feature version='${symbol_dollar}{project.version}'>odl-${artifactId}-api</feature>
<bundle>mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version}</bundle>
- <configfile finalname="${artifactId}-impl-default-config.xml">mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version}/xml/config</configfile>
+ <configfile finalname="${configfile.directory}/${artifactId}.xml">mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version}/xml/config</configfile>
</feature>
- <feature name='odl-${artifactId}-impl-rest' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: impl :: REST '>
- <feature version="${symbol_dollar}{project.version}">odl-${artifactId}-impl</feature>
+ <feature name='odl-${artifactId}-rest' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: REST'>
+ <feature version="${symbol_dollar}{project.version}">odl-${artifactId}</feature>
<feature version="${symbol_dollar}{mdsal.version}">odl-restconf</feature>
</feature>
- <feature name='odl-${artifactId}-impl-ui' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: impl :: UI'>
- <feature version="${symbol_dollar}{project.version}">odl-${artifactId}-impl-rest</feature>
+ <feature name='odl-${artifactId}-ui' version='${symbol_dollar}{project.version}' description='OpenDaylight :: ${artifactId} :: UI'>
+ <feature version="${symbol_dollar}{project.version}">odl-${artifactId}-rest</feature>
<feature version="${symbol_dollar}{mdsal.version}">odl-mdsal-apidocs</feature>
<feature version="${symbol_dollar}{mdsal.version}">odl-mdsal-xsql</feature>
</feature>
<maven>3.1.1</maven>
</prerequisites>
<properties>
- <karaf.localFeature>odl-${artifactId}-impl-ui</karaf.localFeature>
+ <karaf.localFeature>odl-${artifactId}-ui</karaf.localFeature>
</properties>
<dependencyManagement>
<dependencies>
<scope>runtime</scope>
</dependency>
</dependencies>
+ <!-- DO NOT install or deploy the karaf artifact -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
</plugin>
</plugins>
</build>
+
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/${artifactId}.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/${artifactId}.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/${artifactId}:Main</url>
+ </scm>
</project>
*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Override
public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+ "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
snapshottedJournal = new ArrayList<>(journal.size());
- snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+ List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ snapshottedJournal.addAll(snapshotJournalEntries);
clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
previousSnapshotIndex = snapshotIndex;
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- } else {
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
captureSnapshot.getReplicatedToAllTerm());
+
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm());
}
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
"and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
}
@Override
+ // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method
+ // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not
+ // used this way to manipulate the log because the RaftActor actually has a field replicatedLog
+ // which it creates internally and sets on the RaftActorContext
+ // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor
+ // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter
+ // Simple assertion that will fail if you do so
+ // ReplicatedLog log = new ReplicatedLogImpl();
+ // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log);
+ // assertEquals(log, raftActor.underlyingActor().getReplicatedLog())
public void setReplicatedLog(ReplicatedLog replicatedLog) {
this.replicatedLog = replicatedLog;
}
};
}
+
+ private static class NonPersistentProvider implements DataPersistenceProvider {
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ try {
+ procedure.apply(o);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+
+ }
+ }
+
+ @Test
+ public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ for(int i=0;i< 4;i++) {
+ leaderActor.getReplicatedLog()
+ .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
+ new MockRaftActorContext.MockPayload("A")));
+ }
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // Persist another entry (this will cause a CaptureSnapshot to be triggered
+ leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+ // Trimming log in this scenario is a no-op
+ assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertEquals(-1, leader.getReplicatedToAllIndex());
+
+ }};
+ }
+
+ @Test
+ public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+ leaderActor.getReplicatedLog().setSnapshotIndex(3);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ leader.setReplicatedToAllIndex(3);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // Persist another entry (this will cause a CaptureSnapshot to be triggered
+ leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+ // Trimming log in this scenario is a no-op
+ assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertEquals(3, leader.getReplicatedToAllIndex());
+
+ }};
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import scala.concurrent.Future;
/**
public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener));
+ DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
doRegistration(shard, path, scope);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
reply.getListenerRegistrationPath()));
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@Override
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
LOG.info("Creating ShardManager : {}", shardManagerId);
+ String shardDispatcher =
+ new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
ShardManager.props(cluster, configuration, datastoreContext)
- .withMailbox(ActorContext.MAILBOX), shardManagerId ),
+ .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
}
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
private final Optional<ActorRef> roleChangeNotifier;
+ private final MessageTracker appendEntriesReplyTracker;
+
/**
* Coordinates persistence recovery on startup.
*/
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final String txnDispatcherPath;
+
protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses),
this.name = name;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.dataPersistenceProvider = (datastoreContext.isPersistent())
+ ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
+ .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+ appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+ getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
}
private static Map<String, String> mapPeerAddresses(
onRecoveryComplete();
} else {
super.onReceiveRecover(message);
+ if(LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
+ }
}
}
@Override
public void onReceiveCommand(final Object message) throws Exception {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- handleCreateTransaction(message);
- } else if(message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
- } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof PeerAddressResolved) {
- PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(),
- resolved.getPeerAddress());
- } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
- } else {
- super.onReceiveCommand(message);
+
+ MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+ if(context.error().isPresent()){
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ context.error());
+ }
+
+ try {
+ if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ } finally {
+ context.done();
}
}
shardMBean.incrementReadOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
shardMBean.incrementReadWriteTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
-
+ return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
shardMBean.incrementWriteOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
}
}
+ private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
+ short clientVersion){
+ return getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion)
+ .withDispatcher(txnDispatcherPath),
+ transactionId.toString());
+
+ }
+
private void createTransaction(CreateTransaction createTransaction) {
try {
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final Configuration configuration;
+ private final String shardDispatcherPath;
+
private ShardManagerInfoMBean mBean;
private final DatastoreContext datastoreContext;
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
this.type = datastoreContext.getDataStoreType();
+ this.shardDispatcherPath =
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
for (ShardInformation info : localShards.values()) {
if (info.getActor() == null) {
info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext),
- info.getShardId().toString()));
+ info.getPeerAddresses(), datastoreContext, schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
} else {
info.getActor().tell(message, getSelf());
}
private Future<Void> buildCohortList() {
Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
@Override
}
return null;
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
finishCanCommit(returnFuture);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
return returnFuture;
}
}
returnFuture.set(Boolean.valueOf(result));
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
- return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+ return Futures.sequence(futureList, actorContext.getClientDispatcher());
}
@Override
propagateException, returnFuture, callback);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
return returnFuture;
callback.success();
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@VisibleForTesting
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+ operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
return operationFuture;
}
futureList.add(replyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
serializedReadyReply.getClass()));
}
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
Lists.newArrayList(recordedOperationFutures),
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
}
};
- combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
Future<Object> readFuture = executeOperationAsync(new ReadData(path));
- readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
}
@Override
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
Lists.newArrayList(recordedOperationFutures),
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> notUsed)
}
};
- combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
Future<Object> future = executeOperationAsync(new DataExists(path));
- future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
}
newTxFutureCallback.setPrimaryShard(primaryShard);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
return txFutureCallback;
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable());
- createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+ createTxFuture.onComplete(this, actorContext.getClientDispatcher());
}
@Override
public void run() {
tryCreateTransaction();
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
return;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private final Timeout transactionCommitOperationTimeout;
+ private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+ this.dispatchers = new Dispatchers(actorSystem.dispatchers());
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
jmxReporter.start();
+
}
public DatastoreContext getDatastoreContext() {
throw new UnknownMessageException(String.format(
"FindPrimary returned unkown response: %s", response));
}
- }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
/**
throw new UnknownMessageException(String.format(
"FindLocalShard returned unkown response: %s", response));
}
- }, getActorSystem().dispatcher());
+ }, getClientDispatcher());
}
private String findPrimaryPathOrNull(String shardName) {
return transactionCommitOperationTimeout;
}
+ /**
+ * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+ * code on the datastore
+ * @return
+ */
+ public ExecutionContext getClientDispatcher() {
+ return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+ }
+
+ public String getNotificationDispatcherPath(){
+ return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import scala.concurrent.ExecutionContext;
+
+public class Dispatchers {
+ public static final String DEFAULT_DISPATCHER_PATH = "akka.actor.default-dispatcher";
+ public static final String CLIENT_DISPATCHER_PATH = "client-dispatcher";
+ public static final String TXN_DISPATCHER_PATH = "txn-dispatcher";
+ public static final String SHARD_DISPATCHER_PATH = "shard-dispatcher";
+ public static final String NOTIFICATION_DISPATCHER_PATH = "notification-dispatcher";
+
+ private final akka.dispatch.Dispatchers dispatchers;
+
+ public static enum DispatcherType {
+ Client(CLIENT_DISPATCHER_PATH),
+ Transaction(TXN_DISPATCHER_PATH),
+ Shard(SHARD_DISPATCHER_PATH),
+ Notification(NOTIFICATION_DISPATCHER_PATH);
+
+ private final String path;
+ private DispatcherType(String path){
+ this.path = path;
+ }
+ private String path(akka.dispatch.Dispatchers dispatchers){
+ if(dispatchers.hasDispatcher(path)){
+ return path;
+ }
+ return DEFAULT_DISPATCHER_PATH;
+ }
+
+ private ExecutionContext dispatcher(akka.dispatch.Dispatchers dispatchers){
+ if(dispatchers.hasDispatcher(path)){
+ return dispatchers.lookup(path);
+ }
+ return dispatchers.defaultGlobalDispatcher();
+ }
+ }
+
+ public Dispatchers(akka.dispatch.Dispatchers dispatchers){
+ Preconditions.checkNotNull(dispatchers, "dispatchers should not be null");
+ this.dispatchers = dispatchers;
+ }
+
+ public ExecutionContext getDispatcher(DispatcherType dispatcherType){
+ return dispatcherType.dispatcher(this.dispatchers);
+ }
+
+ public String getDispatcherPath(DispatcherType dispatcherType){
+ return dispatcherType.path(this.dispatchers);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
+ * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
+ * received between the arrival of two instances of the same message and the amount of time it took to process each
+ * of those messages.
+ * <br/>
+ * Usage of the API is as follows,
+ * <pre>
+ *
+ * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ * MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ * // will do nothing
+ * tracker.begin();
+ *
+ * .....
+ *
+ * MessageTracker.Context context = tracker.received(message);
+ *
+ * if(context.error().isPresent()){
+ * LOG.error("{}", context.error().get());
+ * }
+ *
+ * // Some custom processing
+ * process(message);
+ *
+ * context.done();
+ *
+ * </pre>
+ */
+public class MessageTracker {
+
+ private static final Context NO_OP_CONTEXT = new NoOpContext();
+
+ private final Class expectedMessageClass;
+
+ private final long expectedArrivalInterval;
+
+ private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+ private Stopwatch expectedMessageWatch;
+
+ private boolean enabled = false;
+
+ private Object lastExpectedMessage;
+
+ private Object currentMessage;
+
+ private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+
+ /**
+ *
+ * @param expectedMessageClass The class of the message to track
+ * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+ * message
+ */
+ public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+ this.expectedMessageClass = expectedMessageClass;
+ this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+ }
+
+ public void begin(){
+ if(enabled) {
+ return;
+ }
+ enabled = true;
+ expectedMessageWatch = Stopwatch.createStarted();
+ }
+
+ public Context received(Object message){
+ if(!enabled) {
+ return NO_OP_CONTEXT;
+ }
+ this.currentMessage = message;
+ if(expectedMessageClass.isInstance(message)){
+ long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
+ if(actualElapsedTime > expectedArrivalInterval){
+ return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
+ ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
+ actualElapsedTime)));
+ }
+ this.lastExpectedMessage = message;
+ this.messagesSinceLastExpectedMessage.clear();
+ }
+
+ currentMessageContext.reset();
+ return currentMessageContext;
+ }
+
+ private void processed(Object message, long messageElapseTimeInNanos){
+ if(!enabled) {
+ return;
+ }
+ if(!expectedMessageClass.isInstance(message)){
+ this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+ }
+ }
+
+ public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
+ return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+ }
+
+ public static class MessageProcessingTime {
+ private final Class messageClass;
+ private final long elapsedTimeInNanos;
+
+ MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+ this.messageClass = messageClass;
+ this.elapsedTimeInNanos = elapsedTimeInNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "MessageProcessingTime{" +
+ "messageClass=" + messageClass.getSimpleName() +
+ ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
+ '}';
+ }
+
+ public Class getMessageClass() {
+ return messageClass;
+ }
+
+ public long getElapsedTimeInNanos() {
+ return elapsedTimeInNanos;
+ }
+ }
+
+ public interface Error {
+ Object getLastExpectedMessage();
+ Object getCurrentExpectedMessage();
+ List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ }
+
+ private class FailedExpectation implements Error {
+
+ private final Object lastExpectedMessage;
+ private final Object currentExpectedMessage;
+ private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
+ private final long expectedTimeInMillis;
+ private final long actualTimeInMillis;
+
+ public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+ this.lastExpectedMessage = lastExpectedMessage;
+ this.currentExpectedMessage = message;
+ this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
+ this.expectedTimeInMillis = expectedTimeInMillis;
+ this.actualTimeInMillis = actualTimeInMillis;
+ }
+
+ public Object getLastExpectedMessage() {
+ return lastExpectedMessage;
+ }
+
+ public Object getCurrentExpectedMessage() {
+ return currentExpectedMessage;
+ }
+
+ public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
+ return messagesSinceLastExpectedMessage;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("\n> Last Expected Message = " + lastExpectedMessage);
+ builder.append("\n> Current Expected Message = " + currentExpectedMessage);
+ builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
+ builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+ for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
+ builder.append("\n\t> ").append(time.toString());
+ }
+ return builder.toString();
+ }
+
+ }
+
+ public interface Context {
+ Context done();
+ Optional<? extends Error> error();
+ }
+
+ private static class NoOpContext implements Context {
+
+ @Override
+ public Context done() {
+ return this;
+ }
+
+ @Override
+ public Optional<Error> error() {
+ return Optional.absent();
+ }
+ }
+
+ private class CurrentMessageContext implements Context {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ boolean done = true;
+
+ public void reset(){
+ Preconditions.checkState(done);
+ done = false;
+ stopwatch.reset().start();
+ }
+
+ @Override
+ public Context done() {
+ processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ done = true;
+ return this;
+ }
+
+ @Override
+ public Optional<? extends Error> error() {
+ return Optional.absent();
+ }
+ }
+
+ private class ErrorContext implements Context {
+ Object message;
+ private final Optional<? extends Error> error;
+ Stopwatch stopwatch;
+
+ ErrorContext(Object message, Optional<? extends Error> error){
+ this.message = message;
+ this.error = error;
+ this.stopwatch = Stopwatch.createStarted();
+ }
+
+ @Override
+ public Context done(){
+ processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ this.stopwatch.stop();
+ return this;
+ }
+
+ @Override
+ public Optional<? extends Error> error() {
+ return error;
+ }
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
MoreExecutors.sameThreadExecutor());
- doReturn(executor).when(mockActorSystem).dispatcher();
+
ActorContext actorContext = mock(ActorContext.class);
+ doReturn(executor).when(actorContext).getClientDispatcher();
+
String shardName = "shard-1";
final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
shardName, actorContext, mockListener);
shardName, actorContext, mockListener);
doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
doReturn(getSystem().actorSelection(getRef().path())).
when(actorContext).actorSelection(getRef().path());
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
package org.opendaylight.controller.cluster.datastore.utils;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
assertTrue("did not take as much time as expected", watch.getTime() > 1000);
}
+
+ @Test
+ public void testClientDispatcherIsGlobalDispatcher(){
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ }
+
+ @Test
+ public void testClientDispatcherIsNotGlobalDispatcher(){
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+ ActorContext actorContext =
+ new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ actorSystem.shutdown();
+
+ }
+
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import akka.dispatch.MessageDispatcher;
+import org.junit.Test;
+
+public class DispatchersTest {
+
+ @Test
+ public void testGetDefaultDispatcherPath(){
+ akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+ doReturn(false).when(mockDispatchers).hasDispatcher(anyString());
+ Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+ for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) {
+ assertEquals(Dispatchers.DEFAULT_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(type));
+ }
+
+ }
+
+ @Test
+ public void testGetDefaultDispatcher(){
+ akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+ MessageDispatcher mockGlobalDispatcher = mock(MessageDispatcher.class);
+ doReturn(false).when(mockDispatchers).hasDispatcher(anyString());
+ doReturn(mockGlobalDispatcher).when(mockDispatchers).defaultGlobalDispatcher();
+ Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+ for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) {
+ assertEquals(mockGlobalDispatcher,
+ dispatchers.getDispatcher(type));
+ }
+
+ }
+
+ @Test
+ public void testGetDispatcherPath(){
+ akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+ doReturn(true).when(mockDispatchers).hasDispatcher(anyString());
+ Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+ assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client));
+
+ assertEquals(Dispatchers.TXN_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction));
+
+ assertEquals(Dispatchers.SHARD_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard));
+
+ assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification));
+
+ }
+
+ @Test
+ public void testGetDispatcher(){
+ akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+ MessageDispatcher mockDispatcher = mock(MessageDispatcher.class);
+ doReturn(true).when(mockDispatchers).hasDispatcher(anyString());
+ doReturn(mockDispatcher).when(mockDispatchers).lookup(anyString());
+ Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+ assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client));
+
+ assertEquals(Dispatchers.TXN_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction));
+
+ assertEquals(Dispatchers.SHARD_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard));
+
+ assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH,
+ dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification));
+
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private class Foo {}
+
+ @Test
+ public void testNoTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ context2.done();
+
+ }
+
+ @Test
+ public void testFailedExpectationOnTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ Assert.assertEquals(true, context2.error().isPresent());
+ Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size());
+
+ }
+
+ @Test
+ public void testFailedExpectationOnTrackingWithMessagesInBetween(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ messageTracker.received("A").done();
+ messageTracker.received(Long.valueOf(10)).done();
+ MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ c.done();
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context2.error().isPresent());
+
+ MessageTracker.Error error = context2.error().get();
+
+ List<MessageTracker.MessageProcessingTime> messageProcessingTimes =
+ error.getMessageProcessingTimesSinceLastExpectedMessage();
+
+ Assert.assertEquals(3, messageProcessingTimes.size());
+
+ Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
+ Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
+ Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
+ Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+ Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
+ Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+ LOG.error("An error occurred : {}" , error);
+
+ }
+
+
+ @Test
+ public void testMetExpectationOnTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ Assert.assertEquals(false, context2.error().isPresent());
+
+ }
+
+ @Test
+ public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ messageTracker.received(new Foo());
+
+ try {
+ messageTracker.received(new Foo());
+ fail("Expected an IllegalStateException");
+ } catch (IllegalStateException e){
+
+ }
+ }
+
+ @Test
+ public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+ messageTracker.received(new Foo());
+ messageTracker.received(new Foo());
+ }
+
+ @Test
+ public void testDelayInFirstExpectedMessageArrival(){
+
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context.error().isPresent());
+
+ MessageTracker.Error error = context.error().get();
+
+ Assert.assertEquals(null, error.getLastExpectedMessage());
+ Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+ String errorString = error.toString();
+ Assert.assertTrue(errorString.contains("Last Expected Message = null"));
+
+ LOG.error("An error occurred : {}", error);
+ }
+
+ @Test
+ public void testCallingBeginDoesNotResetWatch(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ messageTracker.begin();
+
+ MessageTracker.Context context = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context.error().isPresent());
+
+ }
+
+ @Test
+ public void testMessagesSinceLastExpectedMessage(){
+
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();
+
+ Assert.assertEquals(false, context1.error().isPresent());
+
+ MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done();
+
+ Assert.assertEquals(false, context2.error().isPresent());
+
+ List<MessageTracker.MessageProcessingTime> processingTimeList =
+ messageTracker.getMessagesSinceLastExpectedMessage();
+
+ Assert.assertEquals(2, processingTimeList.size());
+
+ assertEquals(Integer.class, processingTimeList.get(0).getMessageClass());
+ assertEquals(Long.class, processingTimeList.get(1).getMessageClass());
+
+ }
+
+}
\ No newline at end of file
--- /dev/null
+akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ "com.google.protobuf.Message" = proto
+
+ }
+ }
+}
+
+in-memory-journal {
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
+in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
+bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+}
+
+client-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+transaction-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+shard-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
+
+notification-dispatcher {
+ # Dispatcher is the name of the event-based dispatcher
+ type = Dispatcher
+ # What kind of ExecutionService to use
+ executor = "fork-join-executor"
+ # Configuration for the fork join pool
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+ # Parallelism (threads) ... ceil(available processors * factor)
+ parallelism-factor = 2.0
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 10
+ }
+ # Throughput defines the maximum number of messages to be
+ # processed per actor before the thread jumps to the next actor.
+ # Set to 1 for as fair as possible.
+ throughput = 100
+}
\ No newline at end of file