From: Tony Tkacik Date: Tue, 24 Feb 2015 08:26:34 +0000 (+0000) Subject: Merge "Startup archetype - move impl under 'impl' namespace to prevent exporting." X-Git-Tag: release/lithium~473^2~10 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=13b8e537dfe1cb63f9ca312894ea94a7c2d37ad6;hp=dd94fde6d8dc06c0bc59d1442a61e26149385c9d Merge "Startup archetype - move impl under 'impl' namespace to prevent exporting." --- diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml index 49b43f442e..c5adb28db7 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml @@ -26,6 +26,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL 1.2.0-SNAPSHOT 0.7.0-SNAPSHOT + etc/opendaylight/karaf diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/src/main/features/features.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/src/main/features/features.xml index bbeb2de90e..1facf4c8aa 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/src/main/features/features.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/src/main/features/features.xml @@ -16,22 +16,22 @@ and is available at http://www.eclipse.org/legal/epl-v10.html mvn:org.opendaylight.yangtools/features-yangtools/${symbol_dollar}{yangtools.version}/xml/features mvn:org.opendaylight.controller/features-mdsal/${symbol_dollar}{mdsal.version}/xml/features mvn:org.opendaylight.controller/features-restconf/${symbol_dollar}{mdsal.version}/xml/features - + odl-yangtools-models mvn:${groupId}/${artifactId}-api/${symbol_dollar}{project.version} - + odl-mdsal-broker odl-${artifactId}-api mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version} - mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version}/xml/config + mvn:${groupId}/${artifactId}-impl/${symbol_dollar}{project.version}/xml/config - - odl-${artifactId}-impl + + odl-${artifactId} odl-restconf - - odl-${artifactId}-impl-rest + + odl-${artifactId}-rest odl-mdsal-apidocs odl-mdsal-xsql diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/karaf/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/karaf/pom.xml index 87b955c6b9..486e3d39ba 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/karaf/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/karaf/pom.xml @@ -24,7 +24,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL 3.1.1 - odl-${artifactId}-impl-ui + odl-${artifactId}-ui @@ -54,4 +54,23 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL runtime + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.apache.maven.plugins + maven-install-plugin + + true + + + + diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml index 067d4db4e9..616704ada0 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml @@ -41,4 +41,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL + + + scm:git:ssh://git.opendaylight.org:29418/${artifactId}.git + scm:git:ssh://git.opendaylight.org:29418/${artifactId}.git + HEAD + https://wiki.opendaylight.org/view/${artifactId}:Main + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index e2aa16918e..1aecc89eea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -187,9 +188,14 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { + Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex, + "snapshotCapturedIndex must be greater than or equal to snapshotIndex"); + snapshottedJournal = new ArrayList<>(journal.size()); - snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); + List snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + snapshottedJournal.addAll(snapshotJournalEntries); clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); previousSnapshotIndex = snapshotIndex; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 3ec8cc5c58..285be39c0b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -677,12 +677,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - } else { + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ // clear the log based on replicatedToAllIndex context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), captureSnapshot.getReplicatedToAllTerm()); + + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else { + // The replicatedToAllIndex was not found in the log + // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. + // In this scenario we may need to save the snapshot to the akka persistence + // snapshot for recovery but we do not need to do the replicated log trimming. + context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(), + replicatedLog.getSnapshotTerm()); } - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 24581d6d2a..297d781251 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -134,6 +134,16 @@ public class MockRaftActorContext implements RaftActorContext { } @Override + // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method + // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not + // used this way to manipulate the log because the RaftActor actually has a field replicatedLog + // which it creates internally and sets on the RaftActorContext + // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor + // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter + // Simple assertion that will fail if you do so + // ReplicatedLog log = new ReplicatedLogImpl(); + // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log); + // assertEquals(log, raftActor.underlyingActor().getReplicatedLog()) public void setReplicatedLog(ReplicatedLog replicatedLog) { this.replicatedLog = replicatedLog; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 83868b6a2a..56bfc21f23 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1227,6 +1227,128 @@ public class RaftActorTest extends AbstractActorTest { }; } + + private static class NonPersistentProvider implements DataPersistenceProvider { + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void saveSnapshot(Object o) { + + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + + } + + @Override + public void deleteMessages(long sequenceNumber) { + + } + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + for(int i=0;i< 4;i++) { + leaderActor.getReplicatedLog() + .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + new MockRaftActorContext.MockPayload("A"))); + } + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(-1, leader.getReplicatedToAllIndex()); + + }}; + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + leaderActor.getReplicatedLog().setSnapshotIndex(3); + + leaderActor.waitForInitializeBehaviorComplete(); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + leader.setReplicatedToAllIndex(3); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(3, leader.getReplicatedToAllIndex()); + + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 06f3afc57c..681132e660 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.dispatch.OnComplete; +import com.google.common.annotations.VisibleForTesting; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -25,7 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import scala.concurrent.Future; /** @@ -93,7 +93,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) { dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props(listener)); + DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath())); Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { @@ -109,7 +109,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration doRegistration(shard, path, scope); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } private void doRegistration(ActorRef shard, final YangInstanceIdentifier path, @@ -131,7 +131,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration reply.getListenerRegistrationPath())); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 107c959112..afbdbe1fe9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -14,6 +14,7 @@ import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -52,9 +53,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au LOG.info("Creating ShardManager : {}", shardManagerId); + String shardDispatcher = + new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + actorContext = new ActorContext(actorSystem, actorSystem.actorOf( ShardManager.props(cluster, configuration, datastoreContext) - .withMailbox(ActorContext.MAILBOX), shardManagerId ), + .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration, datastoreContext); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 21d74a6e1a..0672023fcb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -60,10 +60,13 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -71,6 +74,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -125,6 +129,8 @@ public class Shard extends RaftActor { private final Optional roleChangeNotifier; + private final MessageTracker appendEntriesReplyTracker; + /** * Coordinates persistence recovery on startup. */ @@ -133,6 +139,8 @@ public class Shard extends RaftActor { private final Map transactionChains = new HashMap<>(); + private final String txnDispatcherPath; + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), @@ -141,7 +149,11 @@ public class Shard extends RaftActor { this.name = name; this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; - this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); + this.dataPersistenceProvider = (datastoreContext.isPersistent()) + ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); + this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Transaction); + LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); @@ -168,6 +180,9 @@ public class Shard extends RaftActor { // create a notifier actor for each cluster member roleChangeNotifier = createRoleChangeNotifier(name.toString()); + + appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, + getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); } private static Map mapPeerAddresses( @@ -224,35 +239,50 @@ public class Shard extends RaftActor { onRecoveryComplete(); } else { super.onReceiveRecover(message); + if(LOG.isTraceEnabled()) { + appendEntriesReplyTracker.begin(); + } } } @Override public void onReceiveCommand(final Object message) throws Exception { - if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { - handleCreateTransaction(message); - } else if(message instanceof ForwardedReadyTransaction) { - handleForwardedReadyTransaction((ForwardedReadyTransaction)message); - } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { - handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); - } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { - handleCommitTransaction(CommitTransaction.fromSerializable(message)); - } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { - handleAbortTransaction(AbortTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){ - closeTransactionChain(CloseTransactionChain.fromSerializable(message)); - } else if (message instanceof RegisterChangeListener) { - registerChangeListener((RegisterChangeListener) message); - } else if (message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); - } else if (message instanceof PeerAddressResolved) { - PeerAddressResolved resolved = (PeerAddressResolved) message; - setPeerAddress(resolved.getPeerId().toString(), - resolved.getPeerAddress()); - } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { - handleTransactionCommitTimeoutCheck(); - } else { - super.onReceiveCommand(message); + + MessageTracker.Context context = appendEntriesReplyTracker.received(message); + + if(context.error().isPresent()){ + LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), + context.error()); + } + + try { + if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + handleCreateTransaction(message); + } else if (message instanceof ForwardedReadyTransaction) { + handleForwardedReadyTransaction((ForwardedReadyTransaction) message); + } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); + } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + handleCommitTransaction(CommitTransaction.fromSerializable(message)); + } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + handleAbortTransaction(AbortTransaction.fromSerializable(message)); + } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) { + closeTransactionChain(CloseTransactionChain.fromSerializable(message)); + } else if (message instanceof RegisterChangeListener) { + registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); + } else if (message instanceof PeerAddressResolved) { + PeerAddressResolved resolved = (PeerAddressResolved) message; + setPeerAddress(resolved.getPeerId().toString(), + resolved.getPeerAddress()); + } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { + handleTransactionCommitTimeoutCheck(); + } else { + super.onReceiveCommand(message); + } + } finally { + context.done(); } } @@ -493,32 +523,19 @@ public class Shard extends RaftActor { shardMBean.incrementReadOnlyTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), - schemaContext,datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); + return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { shardMBean.incrementReadWriteTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); - + return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { shardMBean.incrementWriteOnlyTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); + return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -526,6 +543,17 @@ public class Shard extends RaftActor { } } + private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, + short clientVersion){ + return getContext().actorOf( + ShardTransaction.props(transaction, getSelf(), + schemaContext, datastoreContext, shardMBean, + transactionId.getRemoteTransactionId(), clientVersion) + .withDispatcher(txnDispatcherPath), + transactionId.toString()); + + } + private void createTransaction(CreateTransaction createTransaction) { try { ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d52965e055..426a2e0934 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -87,6 +88,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; + private final String shardDispatcherPath; + private ShardManagerInfoMBean mBean; private final DatastoreContext datastoreContext; @@ -105,6 +108,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardDispatcherPath = + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -283,8 +288,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), - info.getShardId().toString())); + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c51ea80726..4445b14e2e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private Future buildCohortList() { Future> combinedFutures = Futures.sequence(cohortFutures, - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); return combinedFutures.transform(new AbstractFunction1, Void>() { @Override @@ -83,7 +83,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } return null; } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); + }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override @@ -111,7 +111,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho finishCanCommit(returnFuture); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return returnFuture; } @@ -158,7 +158,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } returnFuture.set(Boolean.valueOf(result)); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { @@ -170,7 +170,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); } - return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); + return Futures.sequence(futureList, actorContext.getClientDispatcher()); } @Override @@ -239,7 +239,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho propagateException, returnFuture, callback); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return returnFuture; @@ -304,7 +304,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho callback.success(); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } @VisibleForTesting diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 530a36cff6..03d1b3a6d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -60,7 +60,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } private Future completeOperation(Future operationFuture){ - operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher()); + operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher()); return operationFuture; } @@ -105,7 +105,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { futureList.add(replyFuture); Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); // Transform the combined Future into a Future that returns the cohort actor path from // the ReadyTransactionReply. That's the end result of the ready operation. @@ -152,7 +152,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { serializedReadyReply.getClass())); } } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); + }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override @@ -198,7 +198,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); OnComplete> onComplete = new OnComplete>() { @Override @@ -216,7 +216,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); } } @@ -255,7 +255,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future readFuture = executeOperationAsync(new ReadData(path)); - readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + readFuture.onComplete(onComplete, actorContext.getClientDispatcher()); } @Override @@ -280,7 +280,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) @@ -297,7 +297,7 @@ final class TransactionContextImpl extends AbstractTransactionContext { } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); } } @@ -332,6 +332,6 @@ final class TransactionContextImpl extends AbstractTransactionContext { Future future = executeOperationAsync(new DataExists(path)); - future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + future.onComplete(onComplete, actorContext.getClientDispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5bc53442ae..d63ec8010d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -484,7 +484,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { newTxFutureCallback.setPrimaryShard(primaryShard); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return txFutureCallback; @@ -601,7 +601,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); - createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher()); + createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @Override @@ -621,7 +621,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void run() { tryCreateTransaction(); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index cb06c898fd..26e6318f6d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -47,6 +47,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -93,6 +94,7 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private final Timeout transactionCommitOperationTimeout; + private final Dispatchers dispatchers; private volatile SchemaContext schemaContext; @@ -111,6 +113,7 @@ public class ActorContext { this.configuration = configuration; this.datastoreContext = datastoreContext; this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); + this.dispatchers = new Dispatchers(actorSystem.dispatchers()); operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); @@ -127,6 +130,7 @@ public class ActorContext { transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); jmxReporter.start(); + } public DatastoreContext getDatastoreContext() { @@ -200,7 +204,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindPrimary returned unkown response: %s", response)); } - }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } /** @@ -251,7 +255,7 @@ public class ActorContext { throw new UnknownMessageException(String.format( "FindLocalShard returned unkown response: %s", response)); } - }, getActorSystem().dispatcher()); + }, getClientDispatcher()); } private String findPrimaryPathOrNull(String shardName) { @@ -514,5 +518,17 @@ public class ActorContext { return transactionCommitOperationTimeout; } + /** + * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client + * code on the datastore + * @return + */ + public ExecutionContext getClientDispatcher() { + return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client); + } + + public String getNotificationDispatcherPath(){ + return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java new file mode 100644 index 0000000000..8de8a9d193 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Preconditions; +import scala.concurrent.ExecutionContext; + +public class Dispatchers { + public static final String DEFAULT_DISPATCHER_PATH = "akka.actor.default-dispatcher"; + public static final String CLIENT_DISPATCHER_PATH = "client-dispatcher"; + public static final String TXN_DISPATCHER_PATH = "txn-dispatcher"; + public static final String SHARD_DISPATCHER_PATH = "shard-dispatcher"; + public static final String NOTIFICATION_DISPATCHER_PATH = "notification-dispatcher"; + + private final akka.dispatch.Dispatchers dispatchers; + + public static enum DispatcherType { + Client(CLIENT_DISPATCHER_PATH), + Transaction(TXN_DISPATCHER_PATH), + Shard(SHARD_DISPATCHER_PATH), + Notification(NOTIFICATION_DISPATCHER_PATH); + + private final String path; + private DispatcherType(String path){ + this.path = path; + } + private String path(akka.dispatch.Dispatchers dispatchers){ + if(dispatchers.hasDispatcher(path)){ + return path; + } + return DEFAULT_DISPATCHER_PATH; + } + + private ExecutionContext dispatcher(akka.dispatch.Dispatchers dispatchers){ + if(dispatchers.hasDispatcher(path)){ + return dispatchers.lookup(path); + } + return dispatchers.defaultGlobalDispatcher(); + } + } + + public Dispatchers(akka.dispatch.Dispatchers dispatchers){ + Preconditions.checkNotNull(dispatchers, "dispatchers should not be null"); + this.dispatchers = dispatchers; + } + + public ExecutionContext getDispatcher(DispatcherType dispatcherType){ + return dispatcherType.dispatcher(this.dispatchers); + } + + public String getDispatcherPath(DispatcherType dispatcherType){ + return dispatcherType.path(this.dispatchers); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java new file mode 100644 index 0000000000..2757d2f5f6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was + * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that + * received between the arrival of two instances of the same message and the amount of time it took to process each + * of those messages. + *
+ * Usage of the API is as follows, + *
+ *
+ *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ *     // will do nothing
+ *     tracker.begin();
+ *
+ *     .....
+ *
+ *     MessageTracker.Context context = tracker.received(message);
+ *
+ *     if(context.error().isPresent()){
+ *         LOG.error("{}", context.error().get());
+ *     }
+ *
+ *     // Some custom processing
+ *     process(message);
+ *
+ *     context.done();
+ *
+ * 
+ */ +public class MessageTracker { + + private static final Context NO_OP_CONTEXT = new NoOpContext(); + + private final Class expectedMessageClass; + + private final long expectedArrivalInterval; + + private final List messagesSinceLastExpectedMessage = new LinkedList<>(); + + private Stopwatch expectedMessageWatch; + + private boolean enabled = false; + + private Object lastExpectedMessage; + + private Object currentMessage; + + private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); + + /** + * + * @param expectedMessageClass The class of the message to track + * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected + * message + */ + public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ + this.expectedMessageClass = expectedMessageClass; + this.expectedArrivalInterval = expectedArrivalIntervalInMillis; + } + + public void begin(){ + if(enabled) { + return; + } + enabled = true; + expectedMessageWatch = Stopwatch.createStarted(); + } + + public Context received(Object message){ + if(!enabled) { + return NO_OP_CONTEXT; + } + this.currentMessage = message; + if(expectedMessageClass.isInstance(message)){ + long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); + if(actualElapsedTime > expectedArrivalInterval){ + return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, + ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, + actualElapsedTime))); + } + this.lastExpectedMessage = message; + this.messagesSinceLastExpectedMessage.clear(); + } + + currentMessageContext.reset(); + return currentMessageContext; + } + + private void processed(Object message, long messageElapseTimeInNanos){ + if(!enabled) { + return; + } + if(!expectedMessageClass.isInstance(message)){ + this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); + } + } + + public List getMessagesSinceLastExpectedMessage(){ + return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); + } + + public static class MessageProcessingTime { + private final Class messageClass; + private final long elapsedTimeInNanos; + + MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ + this.messageClass = messageClass; + this.elapsedTimeInNanos = elapsedTimeInNanos; + } + + @Override + public String toString() { + return "MessageProcessingTime{" + + "messageClass=" + messageClass.getSimpleName() + + ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + + '}'; + } + + public Class getMessageClass() { + return messageClass; + } + + public long getElapsedTimeInNanos() { + return elapsedTimeInNanos; + } + } + + public interface Error { + Object getLastExpectedMessage(); + Object getCurrentExpectedMessage(); + List getMessageProcessingTimesSinceLastExpectedMessage(); + } + + private class FailedExpectation implements Error { + + private final Object lastExpectedMessage; + private final Object currentExpectedMessage; + private final List messagesSinceLastExpectedMessage; + private final long expectedTimeInMillis; + private final long actualTimeInMillis; + + public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { + this.lastExpectedMessage = lastExpectedMessage; + this.currentExpectedMessage = message; + this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; + this.expectedTimeInMillis = expectedTimeInMillis; + this.actualTimeInMillis = actualTimeInMillis; + } + + public Object getLastExpectedMessage() { + return lastExpectedMessage; + } + + public Object getCurrentExpectedMessage() { + return currentExpectedMessage; + } + + public List getMessageProcessingTimesSinceLastExpectedMessage() { + return messagesSinceLastExpectedMessage; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("\n> Last Expected Message = " + lastExpectedMessage); + builder.append("\n> Current Expected Message = " + currentExpectedMessage); + builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); + builder.append("\n> Actual time in between messages = " + actualTimeInMillis); + for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { + builder.append("\n\t> ").append(time.toString()); + } + return builder.toString(); + } + + } + + public interface Context { + Context done(); + Optional error(); + } + + private static class NoOpContext implements Context { + + @Override + public Context done() { + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + } + + private class CurrentMessageContext implements Context { + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean done = true; + + public void reset(){ + Preconditions.checkState(done); + done = false; + stopwatch.reset().start(); + } + + @Override + public Context done() { + processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + done = true; + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + } + + private class ErrorContext implements Context { + Object message; + private final Optional error; + Stopwatch stopwatch; + + ErrorContext(Object message, Optional error){ + this.message = message; + this.error = error; + this.stopwatch = Stopwatch.createStarted(); + } + + @Override + public Context done(){ + processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); + this.stopwatch.stop(); + return this; + } + + @Override + public Optional error() { + return error; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 58aec30a84..f6c8f07f6b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -36,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -193,10 +194,12 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( MoreExecutors.sameThreadExecutor()); - doReturn(executor).when(mockActorSystem).dispatcher(); + ActorContext actorContext = mock(ActorContext.class); + doReturn(executor).when(actorContext).getClientDispatcher(); + String shardName = "shard-1"; final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( shardName, actorContext, mockListener); @@ -227,7 +230,9 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { shardName, actorContext, mockListener); doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); doReturn(getSystem().actorSelection(getRef().path())). when(actorContext).actorSelection(getRef().path()); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index b013515f25..0a2a0d1bc0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -66,6 +66,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { MockitoAnnotations.initMocks(this); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); doReturn(datastoreContext).when(actorContext).getDatastoreContext(); doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 7ce41a4db1..fa2f9187d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -129,6 +129,7 @@ public class TransactionProxyTest { DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index eae46da2ee..3c6a0cef5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,17 +1,20 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Test; @@ -299,4 +302,41 @@ public class ActorContextTest extends AbstractActorTest{ assertTrue("did not take as much time as expected", watch.getTime() > 1000); } + + @Test + public void testClientDispatcherIsGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + } + + @Test + public void testClientDispatcherIsNotGlobalDispatcher(){ + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); + + ActorContext actorContext = + new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); + + actorSystem.shutdown(); + + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java new file mode 100644 index 0000000000..85a0cac3da --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java @@ -0,0 +1,81 @@ +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import akka.dispatch.MessageDispatcher; +import org.junit.Test; + +public class DispatchersTest { + + @Test + public void testGetDefaultDispatcherPath(){ + akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class); + doReturn(false).when(mockDispatchers).hasDispatcher(anyString()); + Dispatchers dispatchers = new Dispatchers(mockDispatchers); + + for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) { + assertEquals(Dispatchers.DEFAULT_DISPATCHER_PATH, + dispatchers.getDispatcherPath(type)); + } + + } + + @Test + public void testGetDefaultDispatcher(){ + akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class); + MessageDispatcher mockGlobalDispatcher = mock(MessageDispatcher.class); + doReturn(false).when(mockDispatchers).hasDispatcher(anyString()); + doReturn(mockGlobalDispatcher).when(mockDispatchers).defaultGlobalDispatcher(); + Dispatchers dispatchers = new Dispatchers(mockDispatchers); + + for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) { + assertEquals(mockGlobalDispatcher, + dispatchers.getDispatcher(type)); + } + + } + + @Test + public void testGetDispatcherPath(){ + akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class); + doReturn(true).when(mockDispatchers).hasDispatcher(anyString()); + Dispatchers dispatchers = new Dispatchers(mockDispatchers); + + assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client)); + + assertEquals(Dispatchers.TXN_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction)); + + assertEquals(Dispatchers.SHARD_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard)); + + assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification)); + + } + + @Test + public void testGetDispatcher(){ + akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class); + MessageDispatcher mockDispatcher = mock(MessageDispatcher.class); + doReturn(true).when(mockDispatchers).hasDispatcher(anyString()); + doReturn(mockDispatcher).when(mockDispatchers).lookup(anyString()); + Dispatchers dispatchers = new Dispatchers(mockDispatchers); + + assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client)); + + assertEquals(Dispatchers.TXN_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction)); + + assertEquals(Dispatchers.SHARD_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard)); + + assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH, + dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification)); + + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java new file mode 100644 index 0000000000..a125b49a5a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java @@ -0,0 +1,188 @@ +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageTrackerTest { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private class Foo {} + + @Test + public void testNoTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + context2.done(); + + } + + @Test + public void testFailedExpectationOnTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + Assert.assertEquals(true, context2.error().isPresent()); + Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size()); + + } + + @Test + public void testFailedExpectationOnTrackingWithMessagesInBetween(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + messageTracker.received("A").done(); + messageTracker.received(Long.valueOf(10)).done(); + MessageTracker.Context c = messageTracker.received(Integer.valueOf(100)); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + c.done(); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context2.error().isPresent()); + + MessageTracker.Error error = context2.error().get(); + + List messageProcessingTimes = + error.getMessageProcessingTimesSinceLastExpectedMessage(); + + Assert.assertEquals(3, messageProcessingTimes.size()); + + Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass()); + Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass()); + Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass()); + Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10)); + Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass()); + Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass()); + + LOG.error("An error occurred : {}" , error); + + } + + + @Test + public void testMetExpectationOnTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + Assert.assertEquals(false, context2.error().isPresent()); + + } + + @Test + public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + messageTracker.received(new Foo()); + + try { + messageTracker.received(new Foo()); + fail("Expected an IllegalStateException"); + } catch (IllegalStateException e){ + + } + } + + @Test + public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + + messageTracker.received(new Foo()); + messageTracker.received(new Foo()); + } + + @Test + public void testDelayInFirstExpectedMessageArrival(){ + + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context.error().isPresent()); + + MessageTracker.Error error = context.error().get(); + + Assert.assertEquals(null, error.getLastExpectedMessage()); + Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass()); + + String errorString = error.toString(); + Assert.assertTrue(errorString.contains("Last Expected Message = null")); + + LOG.error("An error occurred : {}", error); + } + + @Test + public void testCallingBeginDoesNotResetWatch(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + messageTracker.begin(); + + MessageTracker.Context context = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context.error().isPresent()); + + } + + @Test + public void testMessagesSinceLastExpectedMessage(){ + + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done(); + + Assert.assertEquals(false, context1.error().isPresent()); + + MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done(); + + Assert.assertEquals(false, context2.error().isPresent()); + + List processingTimeList = + messageTracker.getMessagesSinceLastExpectedMessage(); + + Assert.assertEquals(2, processingTimeList.size()); + + assertEquals(Integer.class, processingTimeList.get(0).getMessageClass()); + assertEquals(Long.class, processingTimeList.get(1).getMessageClass()); + + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf new file mode 100644 index 0000000000..32c55a65f6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf @@ -0,0 +1,116 @@ +akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] + + actor { + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java + "com.google.protobuf.Message" = proto + + } + } +} + +in-memory-journal { + class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal" +} + +in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} + +bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms +} + +client-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +transaction-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +shard-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} + +notification-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 +} \ No newline at end of file