Merge "Startup arch - feature cleanup"
authorTony Tkacik <ttkacik@cisco.com>
Tue, 24 Feb 2015 08:26:05 +0000 (08:26 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Feb 2015 08:26:06 +0000 (08:26 +0000)
27 files changed:
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/karaf/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeService.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf [new file with mode: 0644]

index 398554c..486e3d3 100644 (file)
@@ -54,4 +54,23 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
       <scope>runtime</scope>
     </dependency>
   </dependencies>
+  <!-- DO NOT install or deploy the karaf artifact -->
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-install-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
index e2aa169..1aecc89 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -187,9 +188,14 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+                "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
         snapshottedJournal = new ArrayList<>(journal.size());
 
-        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        snapshottedJournal.addAll(snapshotJournalEntries);
         clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
 
         previousSnapshotIndex = snapshotIndex;
index 3ec8cc5..285be39 100644 (file)
@@ -677,12 +677,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
                     captureSnapshot.getLastAppliedTerm());
 
-        } else {
+            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
             // clear the log based on replicatedToAllIndex
             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
                     captureSnapshot.getReplicatedToAllTerm());
+
+            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+        } else {
+            // The replicatedToAllIndex was not found in the log
+            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+            // In this scenario we may need to save the snapshot to the akka persistence
+            // snapshot for recovery but we do not need to do the replicated log trimming.
+            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
+                    replicatedLog.getSnapshotTerm());
         }
-        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+
 
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
index 24581d6..297d781 100644 (file)
@@ -134,6 +134,16 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
     @Override
+    // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method
+    // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not
+    // used this way to manipulate the log because the RaftActor actually has a field replicatedLog
+    // which it creates internally and sets on the RaftActorContext
+    // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor
+    // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter
+    // Simple assertion that will fail if you do so
+    // ReplicatedLog log = new ReplicatedLogImpl();
+    // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log);
+    // assertEquals(log, raftActor.underlyingActor().getReplicatedLog())
     public void setReplicatedLog(ReplicatedLog replicatedLog) {
         this.replicatedLog = replicatedLog;
     }
index 83868b6..56bfc21 100644 (file)
@@ -1227,6 +1227,128 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
+
+    private static class NonPersistentProvider implements DataPersistenceProvider {
+        @Override
+        public boolean isRecoveryApplicable() {
+            return false;
+        }
+
+        @Override
+        public <T> void persist(T o, Procedure<T> procedure) {
+            try {
+                procedure.apply(o);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void saveSnapshot(Object o) {
+
+        }
+
+        @Override
+        public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+
+        }
+
+        @Override
+        public void deleteMessages(long sequenceNumber) {
+
+        }
+    }
+
+    @Test
+    public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("leader-");
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            config.setSnapshotBatchCount(5);
+
+            DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+
+            TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+                    MockRaftActor.props(persistenceId, peerAddresses,
+                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+            MockRaftActor leaderActor = mockActorRef.underlyingActor();
+            leaderActor.getRaftActorContext().setCommitIndex(3);
+            leaderActor.getRaftActorContext().setLastApplied(3);
+            leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+            leaderActor.waitForInitializeBehaviorComplete();
+            for(int i=0;i< 4;i++) {
+                leaderActor.getReplicatedLog()
+                        .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
+                                new MockRaftActorContext.MockPayload("A")));
+            }
+
+            Leader leader = new Leader(leaderActor.getRaftActorContext());
+            leaderActor.setCurrentBehavior(leader);
+            assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+            // Persist another entry (this will cause a CaptureSnapshot to be triggered
+            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+            // Now send a CaptureSnapshotReply
+            mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+            // Trimming log in this scenario is a no-op
+            assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
+            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertEquals(-1, leader.getReplicatedToAllIndex());
+
+        }};
+    }
+
+    @Test
+    public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("leader-");
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            config.setSnapshotBatchCount(5);
+
+            DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+
+            TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+                    MockRaftActor.props(persistenceId, peerAddresses,
+                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+            MockRaftActor leaderActor = mockActorRef.underlyingActor();
+            leaderActor.getRaftActorContext().setCommitIndex(3);
+            leaderActor.getRaftActorContext().setLastApplied(3);
+            leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+            leaderActor.getReplicatedLog().setSnapshotIndex(3);
+
+            leaderActor.waitForInitializeBehaviorComplete();
+            Leader leader = new Leader(leaderActor.getRaftActorContext());
+            leaderActor.setCurrentBehavior(leader);
+            leader.setReplicatedToAllIndex(3);
+            assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+            // Persist another entry (this will cause a CaptureSnapshot to be triggered
+            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+            // Now send a CaptureSnapshotReply
+            mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+            // Trimming log in this scenario is a no-op
+            assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
+            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertEquals(3, leader.getReplicatedToAllIndex());
+
+        }};
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java
new file mode 100644 (file)
index 0000000..2eee0e8
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
+ */
+package org.opendaylight.controller.md.sal.binding.api;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+
+/**
+ * Modified Data Object.
+ *
+ * Represents modification of Data Object.
+ *
+ */
+public interface DataObjectModification<T extends DataObject> extends Identifiable<PathArgument> {
+
+    enum ModificationType {
+        /**
+         *
+         * Child node (direct or indirect) was modified.
+         *
+         */
+        SUBTREE_MODIFIED,
+        /**
+         *
+         * Node was explicitly created / overwritten.
+         *
+         */
+        WRITE,
+        /**
+         *
+         * Node was deleted.
+         *
+         */
+        DELETE
+    }
+
+    @Override
+    PathArgument getIdentifier();
+
+    /**
+     * Returns type of modified object.
+     *
+     * @return type of modified object.
+     */
+    @Nonnull Class<T> getDataType();
+
+    /**
+     *
+     * Returns type of modification
+     *
+     * @return type Type of performed modification.
+     */
+    @Nonnull ModificationType getModificationType();
+
+    /**
+     * Returns after state of top level container.
+     *
+     * @param root Class representing data container
+     * @return State of object after modification. Null if subtree is not present.
+     */
+    @Nullable T getDataAfter();
+
+    /**
+     * Returns unmodifiable collection of modified direct children.
+     *
+     * @return unmodifiable collection of modified direct children.
+     */
+    @Nonnull Collection<DataObjectModification<? extends DataObject>> getModifiedChildren();
+
+
+}
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..6b1df71
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.api;
+
+import java.util.Collection;
+import java.util.EventListener;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface implemented by classes interested in receiving notifications about
+ * data tree changes. This interface differs from {@link DataChangeListener}
+ * in that it provides a cursor-based view of the change, which has potentially
+ * lower overhead and allow more flexible consumption of change event.
+ */
+public interface DataTreeChangeListener extends EventListener {
+    /**
+     * Invoked when there was data change for the supplied path, which was used
+     * to register this listener.
+     *
+     * <p>
+     * This method may be also invoked during registration of the listener if
+     * there is any pre-existing data in the conceptual data tree for supplied
+     * path. This initial event will contain all pre-existing data as created.
+     *
+     * <p>
+     * A data change event may be triggered spuriously, e.g. such that data before
+     * and after compare as equal. Implementations of this interface are expected
+     * to recover from such events. Event producers are expected to exert reasonable
+     * effort to suppress such events.
+     *
+     * In other words, it is completely acceptable to observe
+     * a {@link DataObjectModification}, while the state observed before and
+     * after- data items compare as equal.
+     *
+     * @param changes Collection of change events, may not be null or empty.
+     */
+    void onDataTreeChanged(@Nonnull Collection<DataTreeModification> changes);
+}
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeService.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeService.java
new file mode 100644 (file)
index 0000000..ae4e36f
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A {@link DOMService} which allows users to register for changes to a
+ * subtree.
+ */
+public interface DataTreeChangeService extends BindingService {
+    /**
+     * Registers a {@link DataTreeChangeListener} to receive
+     * notifications when data changes under a given path in the conceptual data
+     * tree.
+     * <p>
+     * You are able to register for notifications  for any node or subtree
+     * which can be represented using {@link DataTreeIdentifier}.
+     * <p>
+     *
+     * You are able to register for data change notifications for a subtree or leaf
+     * even if it does not exist. You will receive notification once that node is
+     * created.
+     * <p>
+     * If there is any pre-existing data in the data tree for the path for which you are
+     * registering, you will receive an initial data change event, which will
+     * contain all pre-existing data, marked as created.
+     *
+     * <p>
+     * This method returns a {@link ListenerRegistration} object. To
+     * "unregister" your listener for changes call the {@link ListenerRegistration#close()}
+     * method on the returned object.
+     * <p>
+     * You MUST explicitly unregister your listener when you no longer want to receive
+     * notifications. This is especially true in OSGi environments, where failure to
+     * do so during bundle shutdown can lead to stale listeners being still registered.
+     *
+     * @param treeId
+     *            Data tree identifier of the subtree which should be watched for
+     *            changes.
+     * @param listener
+     *            Listener instance which is being registered
+     * @return Listener registration object, which may be used to unregister
+     *         your listener using {@link ListenerRegistration#close()} to stop
+     *         delivery of change events.
+     */
+    @Nonnull <L extends DataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DataTreeIdentifier treeId, @Nonnull L listener);
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java
new file mode 100644 (file)
index 0000000..428957e
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.api;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * A unique identifier for a particular subtree. It is composed of the logical
+ * data store type and the instance identifier of the root node.
+ */
+public final class DataTreeIdentifier implements Immutable, Path<DataTreeIdentifier>, Serializable {
+    private static final long serialVersionUID = 1L;
+    private final InstanceIdentifier<?> rootIdentifier;
+    private final LogicalDatastoreType datastoreType;
+
+    public DataTreeIdentifier(final LogicalDatastoreType datastoreType, final InstanceIdentifier<?> rootIdentifier) {
+        this.datastoreType = Preconditions.checkNotNull(datastoreType);
+        this.rootIdentifier = Preconditions.checkNotNull(rootIdentifier);
+    }
+
+    /**
+     * Return the logical data store type.
+     *
+     * @return Logical data store type. Guaranteed to be non-null.
+     */
+    public @Nonnull LogicalDatastoreType getDatastoreType() {
+        return datastoreType;
+    }
+
+    /**
+     * Return the {@link YangInstanceIdentifier} of the root node.
+     *
+     * @return Instance identifier corresponding to the root node.
+     */
+    public @Nonnull InstanceIdentifier<?> getRootIdentifier() {
+        return rootIdentifier;
+    }
+
+    @Override
+    public boolean contains(final DataTreeIdentifier other) {
+        return datastoreType == other.datastoreType && rootIdentifier.contains(other.rootIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + datastoreType.hashCode();
+        result = prime * result + rootIdentifier.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof DataTreeIdentifier)) {
+            return false;
+        }
+        DataTreeIdentifier other = (DataTreeIdentifier) obj;
+        if (datastoreType != other.datastoreType) {
+            return false;
+        }
+        return rootIdentifier.equals(other.rootIdentifier);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeModification.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeModification.java
new file mode 100644 (file)
index 0000000..aac51a6
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html.
+ */
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+
+/**
+ * Represent root of modification.
+ *
+ * @author Tony Tkacik &lt;ttkacik@cisco.com&gt;
+ *
+ */
+public interface DataTreeModification {
+
+    /**
+     * Get the modification root path. This is the path of the root node
+     * relative to the root of InstanceIdentifier namespace.
+     *
+     * @return absolute path of the root node
+     */
+    @Nonnull DataTreeIdentifier getRootPath();
+
+    /**
+     * Get the modification root node.
+     *
+     * @return modification root node
+     */
+    @Nonnull DataObjectModification<? extends DataObject> getRootNode();
+
+}
index 06f3afc..681132e 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -25,7 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import scala.concurrent.Future;
 
 /**
@@ -93,7 +93,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener));
+                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
@@ -109,7 +109,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                     doRegistration(shard, path, scope);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
@@ -131,7 +131,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                             reply.getListenerRegistrationPath()));
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
index 107c959..afbdbe1 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -52,9 +53,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
+        String shardDispatcher =
+                new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
                 ShardManager.props(cluster, configuration, datastoreContext)
-                    .withMailbox(ActorContext.MAILBOX), shardManagerId ),
+                        .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
                 cluster, configuration, datastoreContext);
     }
 
index 21d74a6..0672023 100644 (file)
@@ -60,10 +60,13 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -71,6 +74,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -125,6 +129,8 @@ public class Shard extends RaftActor {
 
     private final Optional<ActorRef> roleChangeNotifier;
 
+    private final MessageTracker appendEntriesReplyTracker;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -133,6 +139,8 @@ public class Shard extends RaftActor {
 
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
+    private final String txnDispatcherPath;
+
     protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses),
@@ -141,7 +149,11 @@ public class Shard extends RaftActor {
         this.name = name;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+        this.dataPersistenceProvider = (datastoreContext.isPersistent())
+                ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+        this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
+                .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
@@ -168,6 +180,9 @@ public class Shard extends RaftActor {
 
         // create a notifier actor for each cluster member
         roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+        appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+                getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -224,35 +239,50 @@ public class Shard extends RaftActor {
             onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
+            if(LOG.isTraceEnabled()) {
+                appendEntriesReplyTracker.begin();
+            }
         }
     }
 
     @Override
     public void onReceiveCommand(final Object message) throws Exception {
-        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
-            handleCreateTransaction(message);
-        } else if(message instanceof ForwardedReadyTransaction) {
-            handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
-        } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
-        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            handleCommitTransaction(CommitTransaction.fromSerializable(message));
-        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            handleAbortTransaction(AbortTransaction.fromSerializable(message));
-        } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
-            closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-        } else if (message instanceof RegisterChangeListener) {
-            registerChangeListener((RegisterChangeListener) message);
-        } else if (message instanceof UpdateSchemaContext) {
-            updateSchemaContext((UpdateSchemaContext) message);
-        } else if (message instanceof PeerAddressResolved) {
-            PeerAddressResolved resolved = (PeerAddressResolved) message;
-            setPeerAddress(resolved.getPeerId().toString(),
-                resolved.getPeerAddress());
-        } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
-            handleTransactionCommitTimeoutCheck();
-        } else {
-            super.onReceiveCommand(message);
+
+        MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+        if(context.error().isPresent()){
+            LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+                    context.error());
+        }
+
+        try {
+            if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+                handleCreateTransaction(message);
+            } else if (message instanceof ForwardedReadyTransaction) {
+                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+            } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+                handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+                handleCommitTransaction(CommitTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+                handleAbortTransaction(AbortTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+                closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+            } else if (message instanceof RegisterChangeListener) {
+                registerChangeListener((RegisterChangeListener) message);
+            } else if (message instanceof UpdateSchemaContext) {
+                updateSchemaContext((UpdateSchemaContext) message);
+            } else if (message instanceof PeerAddressResolved) {
+                PeerAddressResolved resolved = (PeerAddressResolved) message;
+                setPeerAddress(resolved.getPeerId().toString(),
+                        resolved.getPeerAddress());
+            } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                handleTransactionCommitTimeoutCheck();
+            } else {
+                super.onReceiveCommand(message);
+            }
+        } finally {
+            context.done();
         }
     }
 
@@ -493,32 +523,19 @@ public class Shard extends RaftActor {
 
             shardMBean.incrementReadOnlyTransactionCount();
 
-            return getContext().actorOf(
-                ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion),
-                        transactionId.toString());
+            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
             shardMBean.incrementReadWriteTransactionCount();
 
-            return getContext().actorOf(
-                ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
-                        schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion),
-                        transactionId.toString());
-
+            return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
 
         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
             shardMBean.incrementWriteOnlyTransactionCount();
 
-            return getContext().actorOf(
-                ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion),
-                        transactionId.toString());
+            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -526,6 +543,17 @@ public class Shard extends RaftActor {
         }
     }
 
+    private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
+                                            short clientVersion){
+        return getContext().actorOf(
+                ShardTransaction.props(transaction, getSelf(),
+                        schemaContext, datastoreContext, shardMBean,
+                        transactionId.getRemoteTransactionId(), clientVersion)
+                        .withDispatcher(txnDispatcherPath),
+                transactionId.toString());
+
+    }
+
     private void createTransaction(CreateTransaction createTransaction) {
         try {
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
index d52965e..426a2e0 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -87,6 +88,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
+    private final String shardDispatcherPath;
+
     private ShardManagerInfoMBean mBean;
 
     private final DatastoreContext datastoreContext;
@@ -105,6 +108,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardDispatcherPath =
+                new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -283,8 +288,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     for (ShardInformation info : localShards.values()) {
                         if (info.getActor() == null) {
                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                            info.getPeerAddresses(), datastoreContext, schemaContext),
-                                    info.getShardId().toString()));
+                                            info.getPeerAddresses(), datastoreContext, schemaContext)
+                                            .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
                         } else {
                             info.getActor().tell(message, getSelf());
                         }
index c51ea80..4445b14 100644 (file)
@@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private Future<Void> buildCohortList() {
 
         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
             @Override
@@ -83,7 +83,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -111,7 +111,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     finishCanCommit(returnFuture);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
 
         return returnFuture;
     }
@@ -158,7 +158,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 returnFuture.set(Boolean.valueOf(result));
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
@@ -170,7 +170,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
-        return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+        return Futures.sequence(futureList, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -239,7 +239,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                                 propagateException, returnFuture, callback);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return returnFuture;
@@ -304,7 +304,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     callback.success();
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @VisibleForTesting
index 530a36c..03d1b3a 100644 (file)
@@ -60,7 +60,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
@@ -105,7 +105,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         futureList.add(replyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         // Transform the combined Future into a Future that returns the cohort actor path from
         // the ReadyTransactionReply. That's the end result of the ready operation.
@@ -152,7 +152,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                             serializedReadyReply.getClass()));
                 }
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -198,7 +198,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
 
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
@@ -216,7 +216,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
 
     }
@@ -255,7 +255,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> readFuture = executeOperationAsync(new ReadData(path));
 
-        readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -280,7 +280,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                     Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getActorSystem().dispatcher());
+                    actorContext.getClientDispatcher());
             OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<Object> notUsed)
@@ -297,7 +297,7 @@ final class TransactionContextImpl extends AbstractTransactionContext {
                 }
             };
 
-            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
         }
     }
 
@@ -332,6 +332,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         Future<Object> future = executeOperationAsync(new DataExists(path));
 
-        future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 }
index 5bc5344..d63ec80 100644 (file)
@@ -484,7 +484,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return txFutureCallback;
@@ -601,7 +601,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
 
-            createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+            createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
 
         @Override
@@ -621,7 +621,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                                 public void run() {
                                     tryCreateTransaction();
                                 }
-                            }, actorContext.getActorSystem().dispatcher());
+                            }, actorContext.getClientDispatcher());
                     return;
                 }
             }
index cb06c89..26e6318 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -93,6 +94,7 @@ public class ActorContext {
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
     private final Timeout transactionCommitOperationTimeout;
+    private final Dispatchers dispatchers;
 
     private volatile SchemaContext schemaContext;
 
@@ -111,6 +113,7 @@ public class ActorContext {
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
         this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+        this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
@@ -127,6 +130,7 @@ public class ActorContext {
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
         jmxReporter.start();
+
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -200,7 +204,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindPrimary returned unkown response: %s", response));
             }
-        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+        }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
     /**
@@ -251,7 +255,7 @@ public class ActorContext {
                 throw new UnknownMessageException(String.format(
                         "FindLocalShard returned unkown response: %s", response));
             }
-        }, getActorSystem().dispatcher());
+        }, getClientDispatcher());
     }
 
     private String findPrimaryPathOrNull(String shardName) {
@@ -514,5 +518,17 @@ public class ActorContext {
         return transactionCommitOperationTimeout;
     }
 
+    /**
+     * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
+     * code on the datastore
+     * @return
+     */
+    public ExecutionContext getClientDispatcher() {
+        return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
+    }
+
+    public String getNotificationDispatcherPath(){
+        return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
+    }
 
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/Dispatchers.java
new file mode 100644 (file)
index 0000000..8de8a9d
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import scala.concurrent.ExecutionContext;
+
+public class Dispatchers {
+    public static final String DEFAULT_DISPATCHER_PATH = "akka.actor.default-dispatcher";
+    public static final String CLIENT_DISPATCHER_PATH = "client-dispatcher";
+    public static final String TXN_DISPATCHER_PATH = "txn-dispatcher";
+    public static final String SHARD_DISPATCHER_PATH = "shard-dispatcher";
+    public static final String NOTIFICATION_DISPATCHER_PATH = "notification-dispatcher";
+
+    private final akka.dispatch.Dispatchers dispatchers;
+
+    public static enum DispatcherType {
+        Client(CLIENT_DISPATCHER_PATH),
+        Transaction(TXN_DISPATCHER_PATH),
+        Shard(SHARD_DISPATCHER_PATH),
+        Notification(NOTIFICATION_DISPATCHER_PATH);
+
+        private final String path;
+        private DispatcherType(String path){
+            this.path = path;
+        }
+        private String path(akka.dispatch.Dispatchers dispatchers){
+            if(dispatchers.hasDispatcher(path)){
+                return path;
+            }
+            return DEFAULT_DISPATCHER_PATH;
+        }
+
+        private ExecutionContext dispatcher(akka.dispatch.Dispatchers dispatchers){
+            if(dispatchers.hasDispatcher(path)){
+                return dispatchers.lookup(path);
+            }
+            return dispatchers.defaultGlobalDispatcher();
+        }
+    }
+
+    public Dispatchers(akka.dispatch.Dispatchers dispatchers){
+        Preconditions.checkNotNull(dispatchers, "dispatchers should not be null");
+        this.dispatchers = dispatchers;
+    }
+
+    public ExecutionContext getDispatcher(DispatcherType dispatcherType){
+        return dispatcherType.dispatcher(this.dispatchers);
+    }
+
+    public String getDispatcherPath(DispatcherType dispatcherType){
+        return dispatcherType.path(this.dispatchers);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java
new file mode 100644 (file)
index 0000000..2757d2f
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
+ * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
+ * received between the arrival of two instances of the same message and the amount of time it took to process each
+ * of those messages.
+ * <br/>
+ * Usage of the API is as follows,
+ * <pre>
+ *
+ *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ *     // will do nothing
+ *     tracker.begin();
+ *
+ *     .....
+ *
+ *     MessageTracker.Context context = tracker.received(message);
+ *
+ *     if(context.error().isPresent()){
+ *         LOG.error("{}", context.error().get());
+ *     }
+ *
+ *     // Some custom processing
+ *     process(message);
+ *
+ *     context.done();
+ *
+ * </pre>
+ */
+public class MessageTracker {
+
+    private static final Context NO_OP_CONTEXT = new NoOpContext();
+
+    private final Class expectedMessageClass;
+
+    private final long expectedArrivalInterval;
+
+    private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+    private Stopwatch expectedMessageWatch;
+
+    private boolean enabled = false;
+
+    private Object lastExpectedMessage;
+
+    private Object currentMessage;
+
+    private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+
+    /**
+     *
+     * @param expectedMessageClass The class of the message to track
+     * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+     *                                        message
+     */
+    public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+        this.expectedMessageClass = expectedMessageClass;
+        this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+    }
+
+    public void begin(){
+        if(enabled) {
+            return;
+        }
+        enabled = true;
+        expectedMessageWatch = Stopwatch.createStarted();
+    }
+
+    public Context received(Object message){
+        if(!enabled) {
+            return NO_OP_CONTEXT;
+        }
+        this.currentMessage = message;
+        if(expectedMessageClass.isInstance(message)){
+            long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
+            if(actualElapsedTime > expectedArrivalInterval){
+                return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
+                        ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
+                        actualElapsedTime)));
+            }
+            this.lastExpectedMessage = message;
+            this.messagesSinceLastExpectedMessage.clear();
+        }
+
+        currentMessageContext.reset();
+        return currentMessageContext;
+    }
+
+    private void processed(Object message, long messageElapseTimeInNanos){
+        if(!enabled) {
+            return;
+        }
+        if(!expectedMessageClass.isInstance(message)){
+            this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+        }
+    }
+
+    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
+        return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+    }
+
+    public static class MessageProcessingTime {
+        private final Class messageClass;
+        private final long elapsedTimeInNanos;
+
+        MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+            this.messageClass = messageClass;
+            this.elapsedTimeInNanos = elapsedTimeInNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "MessageProcessingTime{" +
+                    "messageClass=" + messageClass.getSimpleName() +
+                    ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
+                    '}';
+        }
+
+        public Class getMessageClass() {
+            return messageClass;
+        }
+
+        public long getElapsedTimeInNanos() {
+            return elapsedTimeInNanos;
+        }
+    }
+
+    public interface Error {
+        Object getLastExpectedMessage();
+        Object getCurrentExpectedMessage();
+        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    }
+
+    private class FailedExpectation implements Error {
+
+        private final Object lastExpectedMessage;
+        private final Object currentExpectedMessage;
+        private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
+        private final long expectedTimeInMillis;
+        private final long actualTimeInMillis;
+
+        public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+            this.lastExpectedMessage = lastExpectedMessage;
+            this.currentExpectedMessage = message;
+            this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
+            this.expectedTimeInMillis = expectedTimeInMillis;
+            this.actualTimeInMillis = actualTimeInMillis;
+        }
+
+        public Object getLastExpectedMessage() {
+            return lastExpectedMessage;
+        }
+
+        public Object getCurrentExpectedMessage() {
+            return currentExpectedMessage;
+        }
+
+        public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
+            return messagesSinceLastExpectedMessage;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("\n> Last Expected Message = " + lastExpectedMessage);
+            builder.append("\n> Current Expected Message = " + currentExpectedMessage);
+            builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
+            builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+            for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
+                builder.append("\n\t> ").append(time.toString());
+            }
+            return builder.toString();
+        }
+
+    }
+
+    public interface Context {
+        Context done();
+        Optional<? extends Error> error();
+    }
+
+    private static class NoOpContext implements Context {
+
+        @Override
+        public Context done() {
+            return this;
+        }
+
+        @Override
+        public Optional<Error> error() {
+            return Optional.absent();
+        }
+    }
+
+    private class CurrentMessageContext implements Context {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        boolean done = true;
+
+        public void reset(){
+            Preconditions.checkState(done);
+            done = false;
+            stopwatch.reset().start();
+        }
+
+        @Override
+        public Context done() {
+            processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+            done = true;
+            return this;
+        }
+
+        @Override
+        public Optional<? extends Error> error() {
+            return Optional.absent();
+        }
+    }
+
+    private class ErrorContext implements Context {
+        Object message;
+        private final Optional<? extends Error> error;
+        Stopwatch stopwatch;
+
+        ErrorContext(Object message, Optional<? extends Error> error){
+            this.message = message;
+            this.error = error;
+            this.stopwatch = Stopwatch.createStarted();
+        }
+
+        @Override
+        public Context done(){
+            processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
+            this.stopwatch.stop();
+            return this;
+        }
+
+        @Override
+        public Optional<? extends Error> error() {
+            return error;
+        }
+    }
+}
index 58aec30..f6c8f07 100644 (file)
@@ -36,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -193,10 +194,12 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
             ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
                     MoreExecutors.sameThreadExecutor());
-            doReturn(executor).when(mockActorSystem).dispatcher();
+
 
             ActorContext actorContext = mock(ActorContext.class);
 
+            doReturn(executor).when(actorContext).getClientDispatcher();
+
             String shardName = "shard-1";
             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
                     shardName, actorContext, mockListener);
@@ -227,7 +230,9 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                     shardName, actorContext, mockListener);
 
             doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+            doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
             doReturn(getSystem()).when(actorContext).getActorSystem();
+            doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
             doReturn(getSystem().actorSelection(getRef().path())).
                     when(actorContext).actorSelection(getRef().path());
             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
index b013515..0a2a0d1 100644 (file)
@@ -66,6 +66,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
         doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
index 7ce41a4..fa2f918 100644 (file)
@@ -129,6 +129,7 @@ public class TransactionProxyTest {
         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
index eae46da..3c6a0ce 100644 (file)
@@ -1,17 +1,20 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
@@ -299,4 +302,41 @@ public class ActorContextTest extends AbstractActorTest{
 
         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
     }
+
+    @Test
+    public void testClientDispatcherIsGlobalDispatcher(){
+
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+    }
+
+    @Test
+    public void testClientDispatcherIsNotGlobalDispatcher(){
+
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+        ActorContext actorContext =
+                new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+        actorSystem.shutdown();
+
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DispatchersTest.java
new file mode 100644 (file)
index 0000000..85a0cac
--- /dev/null
@@ -0,0 +1,81 @@
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import akka.dispatch.MessageDispatcher;
+import org.junit.Test;
+
+public class DispatchersTest {
+
+    @Test
+    public void testGetDefaultDispatcherPath(){
+        akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+        doReturn(false).when(mockDispatchers).hasDispatcher(anyString());
+        Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+        for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) {
+            assertEquals(Dispatchers.DEFAULT_DISPATCHER_PATH,
+                    dispatchers.getDispatcherPath(type));
+        }
+
+    }
+
+    @Test
+    public void testGetDefaultDispatcher(){
+        akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+        MessageDispatcher mockGlobalDispatcher = mock(MessageDispatcher.class);
+        doReturn(false).when(mockDispatchers).hasDispatcher(anyString());
+        doReturn(mockGlobalDispatcher).when(mockDispatchers).defaultGlobalDispatcher();
+        Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+        for(Dispatchers.DispatcherType type : Dispatchers.DispatcherType.values()) {
+            assertEquals(mockGlobalDispatcher,
+                    dispatchers.getDispatcher(type));
+        }
+
+    }
+
+    @Test
+    public void testGetDispatcherPath(){
+        akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+        doReturn(true).when(mockDispatchers).hasDispatcher(anyString());
+        Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+        assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client));
+
+        assertEquals(Dispatchers.TXN_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction));
+
+        assertEquals(Dispatchers.SHARD_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard));
+
+        assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification));
+
+    }
+
+    @Test
+    public void testGetDispatcher(){
+        akka.dispatch.Dispatchers mockDispatchers = mock(akka.dispatch.Dispatchers.class);
+        MessageDispatcher mockDispatcher = mock(MessageDispatcher.class);
+        doReturn(true).when(mockDispatchers).hasDispatcher(anyString());
+        doReturn(mockDispatcher).when(mockDispatchers).lookup(anyString());
+        Dispatchers dispatchers = new Dispatchers(mockDispatchers);
+
+        assertEquals(Dispatchers.CLIENT_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Client));
+
+        assertEquals(Dispatchers.TXN_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction));
+
+        assertEquals(Dispatchers.SHARD_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Shard));
+
+        assertEquals(Dispatchers.NOTIFICATION_DISPATCHER_PATH,
+                dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification));
+
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java
new file mode 100644 (file)
index 0000000..a125b49
--- /dev/null
@@ -0,0 +1,188 @@
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    private class Foo {}
+
+    @Test
+    public void testNoTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        context2.done();
+
+    }
+
+    @Test
+    public void testFailedExpectationOnTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        Assert.assertEquals(true, context2.error().isPresent());
+        Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size());
+
+    }
+
+    @Test
+    public void testFailedExpectationOnTrackingWithMessagesInBetween(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        messageTracker.received("A").done();
+        messageTracker.received(Long.valueOf(10)).done();
+        MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        c.done();
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context2.error().isPresent());
+
+        MessageTracker.Error error = context2.error().get();
+
+        List<MessageTracker.MessageProcessingTime> messageProcessingTimes =
+                error.getMessageProcessingTimesSinceLastExpectedMessage();
+
+        Assert.assertEquals(3, messageProcessingTimes.size());
+
+        Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
+        Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
+        Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
+        Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+        Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
+        Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+        LOG.error("An error occurred : {}" , error);
+
+    }
+
+
+    @Test
+    public void testMetExpectationOnTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        Assert.assertEquals(false, context2.error().isPresent());
+
+    }
+
+    @Test
+    public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        messageTracker.received(new Foo());
+
+        try {
+            messageTracker.received(new Foo());
+            fail("Expected an IllegalStateException");
+        } catch (IllegalStateException e){
+
+        }
+    }
+
+    @Test
+    public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+        messageTracker.received(new Foo());
+        messageTracker.received(new Foo());
+    }
+
+    @Test
+    public void testDelayInFirstExpectedMessageArrival(){
+
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context.error().isPresent());
+
+        MessageTracker.Error error = context.error().get();
+
+        Assert.assertEquals(null, error.getLastExpectedMessage());
+        Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+        String errorString = error.toString();
+        Assert.assertTrue(errorString.contains("Last Expected Message = null"));
+
+        LOG.error("An error occurred : {}", error);
+    }
+
+    @Test
+    public void testCallingBeginDoesNotResetWatch(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        messageTracker.begin();
+
+        MessageTracker.Context context = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context.error().isPresent());
+
+    }
+
+    @Test
+    public void testMessagesSinceLastExpectedMessage(){
+
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();
+
+        Assert.assertEquals(false, context1.error().isPresent());
+
+        MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done();
+
+        Assert.assertEquals(false, context2.error().isPresent());
+
+        List<MessageTracker.MessageProcessingTime> processingTimeList =
+                messageTracker.getMessagesSinceLastExpectedMessage();
+
+        Assert.assertEquals(2, processingTimeList.size());
+
+        assertEquals(Integer.class, processingTimeList.get(0).getMessageClass());
+        assertEquals(Long.class, processingTimeList.get(1).getMessageClass());
+
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application-with-custom-dispatchers.conf
new file mode 100644 (file)
index 0000000..32c55a6
--- /dev/null
@@ -0,0 +1,116 @@
+akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
+    loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
+    actor {
+         serializers {
+                  java = "akka.serialization.JavaSerializer"
+                  proto = "akka.remote.serialization.ProtobufSerializer"
+         }
+
+        serialization-bindings {
+            "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+            "com.google.protobuf.Message" = proto
+
+        }
+    }
+}
+
+in-memory-journal {
+    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+}
+
+in-memory-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
+bounded-mailbox {
+  mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+  mailbox-capacity = 1000
+  mailbox-push-timeout-time = 100ms
+}
+
+client-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+transaction-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+shard-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
+
+notification-dispatcher {
+  # Dispatcher is the name of the event-based dispatcher
+  type = Dispatcher
+  # What kind of ExecutionService to use
+  executor = "fork-join-executor"
+  # Configuration for the fork join pool
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Parallelism (threads) ... ceil(available processors * factor)
+    parallelism-factor = 2.0
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 10
+  }
+  # Throughput defines the maximum number of messages to be
+  # processed per actor before the thread jumps to the next actor.
+  # Set to 1 for as fair as possible.
+  throughput = 100
+}
\ No newline at end of file