BUG-650: remove executor abstraction 01/11401/17
authorRobert Varga <rovarga@cisco.com>
Sat, 20 Sep 2014 10:07:05 +0000 (12:07 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 20 Nov 2014 11:37:48 +0000 (11:37 +0000)
This patch removes sameThreadExecutor from the commit path, eliminating
associated overhead. Relevant benchmarks show improvement pretty much
across the board:

BEFORE                                                            millis        error
write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark    2213.735       77.597
write100KSingleNodeWithOneInnerItemInOneCommitBenchmark          171.524        2.289
write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark     164.282        1.391
write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark           14.161        0.196
write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark     982.697       29.397
write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark           93.233        2.174

AFTER                                                             millis        error   delta
write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark    2138.900       75.844   -3.4%
write100KSingleNodeWithOneInnerItemInOneCommitBenchmark          177.839        3.997   +3.5%
write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark     158.666        1.090   -3.5%
write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark           13.022        0.105   -8.0%
write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark     935.490       30.395   -4.8%
write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark           89.907        1.204   -3.6%

Furthermore it cleans up and marks FIXMEs for defunct statistics. These
will need to be replaced with implementation which does not assume
underlying implementation.

Change-Id: I01c51462a8529a2f874ecd2f9af05faba503bc58
Signed-off-by: Robert Varga <rovarga@cisco.com>
27 files changed:
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWithExecutorServiceBenchmark.java [deleted file]
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWithSameThreadedExecutorBenchmark.java
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWriteTransactionBenchmark.java
opendaylight/md-sal/forwardingrules-manager/src/test/java/test/mock/util/DataBrokerTestCustomizer.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/jmx/InMemoryDataStoreStats.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java
opendaylight/md-sal/statistics-manager/src/test/java/test/mock/util/DataBrokerTestCustomizer.java

index a46a6d1..276f4ec 100644 (file)
@@ -41,10 +41,8 @@ public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBro
         executor = MoreExecutors.listeningDecorator(
             MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS));
 
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec,
-            MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec,
-            MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec);
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec);
         Map<LogicalDatastoreType, DOMStore> datastores = ImmutableMap.of(
             LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
             LogicalDatastoreType.CONFIGURATION, configStore);
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWithExecutorServiceBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryDataStoreWithExecutorServiceBenchmark.java
deleted file mode 100644 (file)
index 77a4966..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.benchmark;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-
-/**
- * Benchmark for testing of performance of write operations for InMemoryDataStore. The instance
- * of benchmark creates InMemoryDataStore with Data Change Listener Executor Service as BlockingBoundedFastThreadPool
- * and DOM Store Executor Service as Blocking Bounded Fast Thread Pool.
- *
- * @see org.opendaylight.yangtools.util.concurrent.SpecialExecutors
- * @see org.opendaylight.controller.md.sal.dom.store.benchmark.AbstractInMemoryDatastoreWriteTransactionBenchmark
- *
- * @author Lukas Sedlak <lsedlak@cisco.com>
- */
-@State(Scope.Thread)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Fork(1)
-public class InMemoryDataStoreWithExecutorServiceBenchmark extends AbstractInMemoryDatastoreWriteTransactionBenchmark  {
-
-    private static final int MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
-    private static final int MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
-    private static final int MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE = 5000;
-
-    @Override
-    @Setup(Level.Trial)
-    public void setUp() throws Exception {
-        final String name = "DS_BENCHMARK";
-        final ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
-            MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, name + "-DCL");
-
-        final ListeningExecutorService domStoreExecutor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(
-            MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE, "DOMStore-" + name ));
-
-        domStore = new InMemoryDOMDataStore(name, domStoreExecutor,
-            dataChangeListenerExecutor);
-        schemaContext = BenchmarkModel.createTestContext();
-        domStore.onGlobalContextUpdated(schemaContext);
-        initTestNode();
-    }
-
-    @Override
-    @TearDown
-    public void tearDown() {
-        schemaContext = null;
-        domStore = null;
-    }
-}
index 6a0cecc..1aa19b2 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.benchmark;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -44,8 +43,7 @@ public class InMemoryDataStoreWithSameThreadedExecutorBenchmark extends Abstract
         final ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
             MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, name + "-DCL");
 
-        domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor(),
-            dataChangeListenerExecutor);
+        domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", dataChangeListenerExecutor);
         schemaContext = BenchmarkModel.createTestContext();
         domStore.onGlobalContextUpdated(schemaContext);
         initTestNode();
index d3dda96..d697f3c 100644 (file)
@@ -35,8 +35,7 @@ public class InMemoryDataStoreWriteTransactionBenchmark extends AbstractInMemory
 
     @Setup(Level.Trial)
     public void setUp() throws Exception {
-        domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor(),
-            MoreExecutors.sameThreadExecutor());
+        domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor());
         schemaContext = BenchmarkModel.createTestContext();
         domStore.onGlobalContextUpdated(schemaContext);
         initTestNode();
index 36ab41f..3070499 100644 (file)
@@ -56,15 +56,13 @@ public class DataBrokerTestCustomizer {
     }
 
     public DOMStore createConfigurationDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
 
     public DOMStore createOperationalDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
index c869b1d..d99ac6f 100644 (file)
@@ -56,15 +56,13 @@ public class DataBrokerTestCustomizer {
     }
 
     public DOMStore createConfigurationDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
 
     public DOMStore createOperationalDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
index d0a326a..7d1b65f 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.sal.binding.test.util;
 
 import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.ImmutableClassToInstanceMap;
@@ -138,10 +137,8 @@ public class BindingTestContext implements AutoCloseable {
 
     public void startNewDomDataBroker() {
         checkState(executor != null, "Executor needs to be set");
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
-                MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
-                MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.OPERATIONAL, operStore)
                 .put(LogicalDatastoreType.CONFIGURATION, configStore)
index 4b13095..d53cb48 100644 (file)
@@ -139,8 +139,8 @@ public class Shard extends RaftActor {
 
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            DatastoreContext datastoreContext, SchemaContext schemaContext) {
+    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+            final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses),
                 Optional.of(datastoreContext.getShardRaftConfig()));
 
@@ -160,7 +160,6 @@ public class Shard extends RaftActor {
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
         if (isMetricsCaptureEnabled()) {
@@ -175,7 +174,7 @@ public class Shard extends RaftActor {
     }
 
     private static Map<String, String> mapPeerAddresses(
-        Map<ShardIdentifier, String> peerAddresses) {
+        final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
 
         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
@@ -188,7 +187,7 @@ public class Shard extends RaftActor {
 
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
@@ -207,7 +206,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(Object message) throws Exception {
+    public void onReceiveRecover(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -226,7 +225,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveCommand(Object message) throws Exception {
+    public void onReceiveCommand(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
@@ -273,7 +272,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCommitTransaction(CommitTransaction commit) {
+    private void handleCommitTransaction(final CommitTransaction commit) {
         final String transactionID = commit.getTransactionID();
 
         LOG.debug("Committing transaction {}", transactionID);
@@ -368,7 +367,7 @@ public class Shard extends RaftActor {
         commitCoordinator.currentTransactionComplete(transactionID, true);
     }
 
-    private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+    private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
@@ -401,11 +400,11 @@ public class Shard extends RaftActor {
                 readyTransactionReply, getSelf());
     }
 
-    private void handleAbortTransaction(AbortTransaction abort) {
+    private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(String transactionID, final ActorRef sender) {
+    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -420,7 +419,7 @@ public class Shard extends RaftActor {
 
             Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void v) {
+                public void onSuccess(final Void v) {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
@@ -429,7 +428,7 @@ public class Shard extends RaftActor {
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
+                public void onFailure(final Throwable t) {
                     LOG.error(t, "An exception happened during abort");
 
                     if(sender != null) {
@@ -440,7 +439,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCreateTransaction(Object message) {
+    private void handleCreateTransaction(final Object message) {
         if (isLeader()) {
             createTransaction(CreateTransaction.fromSerializable(message));
         } else if (getLeader() != null) {
@@ -453,7 +452,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleReadDataReply(Object message) {
+    private void handleReadDataReply(final Object message) {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
@@ -467,7 +466,7 @@ public class Shard extends RaftActor {
         getSender().tell(PoisonPill.getInstance(), self());
     }
 
-    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+    private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
 
@@ -562,14 +561,14 @@ public class Shard extends RaftActor {
         return transactionActor;
     }
 
-    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
         throws ExecutionException, InterruptedException {
         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
 
-    private void commitWithNewTransaction(Modification modification) {
+    private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
@@ -582,18 +581,18 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void updateSchemaContext(UpdateSchemaContext message) {
+    private void updateSchemaContext(final UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
     @VisibleForTesting
-    void updateSchemaContext(SchemaContext schemaContext) {
+    void updateSchemaContext(final SchemaContext schemaContext) {
         store.onGlobalContextUpdated(schemaContext);
     }
 
-    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
+    private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
 
         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
 
@@ -621,7 +620,7 @@ public class Shard extends RaftActor {
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
-            RegisterChangeListener registerChangeListener) {
+            final RegisterChangeListener registerChangeListener) {
 
         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
                 registerChangeListener.getDataChangeListenerPath());
@@ -651,7 +650,7 @@ public class Shard extends RaftActor {
 
     @Override
     protected
-    void startLogRecoveryBatch(int maxBatchSize) {
+    void startLogRecoveryBatch(final int maxBatchSize) {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         if(LOG.isDebugEnabled()) {
@@ -660,7 +659,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else {
@@ -669,7 +668,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(ByteString snapshot) {
+    protected void applyRecoverySnapshot(final ByteString snapshot) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
@@ -734,7 +733,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
@@ -793,7 +792,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(ByteString snapshot) {
+    protected void applySnapshot(final ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -858,7 +857,7 @@ public class Shard extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
+    @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.setLeader(newLeader);
     }
 
@@ -880,8 +879,8 @@ public class Shard extends RaftActor {
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-                DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+                final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
             this.datastoreContext = datastoreContext;
@@ -914,11 +913,11 @@ public class Shard extends RaftActor {
         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                              NormalizedNode<?, ?>>> delegate;
 
-        DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+        DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
             this.registerChangeListener = registerChangeListener;
         }
 
-        void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+        void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                             NormalizedNode<?, ?>>> registration) {
             this.delegate = registration;
         }
index 946e525..4a7752a 100644 (file)
@@ -7,15 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
 /**
  * @author Basheeruddin syedbahm@cisco.com
  *
@@ -24,7 +22,7 @@ public class ShardMBeanFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
 
-    private static Cache<String,ShardStats> shardMBeansCache =
+    private static final Cache<String,ShardStats> shardMBeansCache =
                                       CacheBuilder.newBuilder().weakValues().build();
 
     public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
index 0959c2a..9decd82 100644 (file)
@@ -8,20 +8,18 @@
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
 /**
  * Maintains statistics for a shard.
  *
@@ -62,22 +60,16 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
 
-    private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
-
     private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
-    public ShardStats(String shardName, String mxBeanType) {
+    public ShardStats(final String shardName, final String mxBeanType) {
         super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
     }
 
-    public void setDataStoreExecutor(ExecutorService dsExecutor) {
-        this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor);
-    }
-
-    public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+    public void setNotificationManager(final QueuedNotificationManager<?, ?> manager) {
         this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
                 "notification-manager", getMBeanType(), getMBeanCategory());
 
@@ -194,42 +186,42 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         return abortTransactionsCount.incrementAndGet();
     }
 
-    public void setLeader(String leader) {
+    public void setLeader(final String leader) {
         this.leader = leader;
     }
 
-    public void setRaftState(String raftState) {
+    public void setRaftState(final String raftState) {
         this.raftState = raftState;
     }
 
-    public void setLastLogTerm(long lastLogTerm) {
+    public void setLastLogTerm(final long lastLogTerm) {
         this.lastLogTerm = lastLogTerm;
     }
 
-    public void setLastLogIndex(long lastLogIndex) {
+    public void setLastLogIndex(final long lastLogIndex) {
         this.lastLogIndex = lastLogIndex;
     }
 
-    public void setCurrentTerm(long currentTerm) {
+    public void setCurrentTerm(final long currentTerm) {
         this.currentTerm = currentTerm;
     }
 
-    public void setCommitIndex(long commitIndex) {
+    public void setCommitIndex(final long commitIndex) {
         this.commitIndex = commitIndex;
     }
 
-    public void setLastApplied(long lastApplied) {
+    public void setLastApplied(final long lastApplied) {
         this.lastApplied = lastApplied;
     }
 
-    public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
+    public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
     @Override
     public ThreadExecutorStats getDataStoreExecutorStats() {
-        return dataStoreExecutorStatsBean == null ? null :
-                                        dataStoreExecutorStatsBean.toThreadExecutorStats();
+        // FIXME: this particular thing does not work, as it really is DS-specific
+        return null;
     }
 
     @Override
@@ -269,4 +261,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         abortTransactionsCount.set(0);
 
     }
+
+    public void setDataStore(final InMemoryDOMDataStore store) {
+        setNotificationManager(store.getDataChangeListenerNotificationManager());
+    }
 }
index eb2c242..bf994eb 100644 (file)
@@ -1,9 +1,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
@@ -16,13 +16,8 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-import static org.junit.Assert.assertEquals;
-
 public class DataChangeListenerRegistrationTest extends AbstractActorTest {
-  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
-          MoreExecutors.sameThreadExecutor());
+  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
 
   static {
     store.onGlobalContextUpdated(TestModel.createTestContext());
@@ -46,7 +41,7 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
           final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
             @Override
-            protected String match(Object in) {
+            protected String match(final Object in) {
               if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
                 return "match";
               } else {
@@ -68,7 +63,7 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
   private  AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
     return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
       @Override
-      public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+      public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
 
       }
     };
index e5b5643..926cef6 100644 (file)
@@ -101,7 +101,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-
 public class ShardTest extends AbstractActorTest {
 
     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
@@ -1264,8 +1263,7 @@ public class ShardTest extends AbstractActorTest {
      */
     @Test
     public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
-            MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
 
         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
@@ -1287,7 +1285,6 @@ public class ShardTest extends AbstractActorTest {
         NormalizedNode<?, ?> actual = readStore(store);
 
         assertEquals(expected, actual);
-
     }
 
     @Test
index 5781c19..9f57359 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -42,12 +41,8 @@ import scala.concurrent.duration.Duration;
  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
  */
 public class ShardTransactionFailureTest extends AbstractActorTest {
-    private static ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
     private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", storeExecutor,
-            MoreExecutors.sameThreadExecutor());
+        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
 
     private static final SchemaContext testSchemaContext =
         TestModel.createTestContext();
index 4ccc943..2d9d05b 100644 (file)
@@ -9,7 +9,6 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -48,11 +47,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
 public class ShardTransactionTest extends AbstractActorTest {
-    private static ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
     private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
+        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
 
index 84f3b92..abb2d31 100644 (file)
@@ -26,18 +26,17 @@ public abstract class AbstractModificationTest {
 
   @Before
   public void setUp(){
-    store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(),
-            MoreExecutors.sameThreadExecutor());
+    store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
     store.onGlobalContextUpdated(TestModel.createTestContext());
   }
 
-  protected void commitTransaction(DOMStoreWriteTransaction transaction){
+  protected void commitTransaction(final DOMStoreWriteTransaction transaction){
     DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
     cohort.preCommit();
     cohort.commit();
   }
 
-  protected Optional<NormalizedNode<?,?>> readData(YangInstanceIdentifier path) throws Exception{
+  protected Optional<NormalizedNode<?,?>> readData(final YangInstanceIdentifier path) throws Exception{
     DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
     ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(path);
     return future.get();
index e9ed5b1..eb51db2 100644 (file)
@@ -4,12 +4,16 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -25,13 +29,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 public class DOMBrokerPerformanceTest {
 
     private static final Logger log = LoggerFactory.getLogger(DOMBrokerPerformanceTest.class);
@@ -63,10 +60,8 @@ public class DOMBrokerPerformanceTest {
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
-                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
-                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
index 674d2ff..80c4201 100644 (file)
@@ -57,9 +57,9 @@ public class DOMBrokerTest {
     public void setupStore() {
 
         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+                MoreExecutors.sameThreadExecutor());
         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+                MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
index 18b11c8..17f477b 100644 (file)
@@ -12,12 +12,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -31,12 +34,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 public class DOMTransactionChainTest {
 
     private SchemaContext schemaContext;
@@ -44,10 +41,8 @@ public class DOMTransactionChainTest {
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
index 1ab12ff..3f26266 100644 (file)
@@ -28,9 +28,7 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont
                         getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
                         getMaxDataStoreExecutorQueueSize()));
 
-        InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore",
-                dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
-
+        InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore", dataStore);
         dataStore.setCloseable(statsBean);
 
         return dataStore;
index 9358552..c91c53a 100644 (file)
@@ -28,8 +28,7 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight
                         getMaxDataStoreExecutorQueueSize()));
 
 
-        InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore",
-                dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+        InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore", dataStore);
 
         dataStore.setCloseable(statsBean);
 
index 213f60e..4e01fa9 100644 (file)
@@ -12,8 +12,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -61,6 +59,7 @@ import org.slf4j.LoggerFactory;
 public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
     private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
 
     private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
             new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@@ -80,23 +79,18 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
     private final ExecutorService dataChangeListenerExecutor;
-    private final ListeningExecutorService commitExecutor;
     private final boolean debugTransactions;
     private final String name;
 
     private volatile AutoCloseable closeable;
 
-    public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
-            final ExecutorService dataChangeListenerExecutor) {
-        this(name, commitExecutor, dataChangeListenerExecutor,
-             InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
+    public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor) {
+        this(name, dataChangeListenerExecutor, InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
     }
 
-    public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
-            final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
-            final boolean debugTransactions) {
+    public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor,
+            final int maxDataChangeListenerQueueSize, final boolean debugTransactions) {
         this.name = Preconditions.checkNotNull(name);
-        this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
         this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
         this.debugTransactions = debugTransactions;
 
@@ -114,10 +108,6 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         return dataChangeListenerNotificationManager;
     }
 
-    public ExecutorService getDomStoreExecutor() {
-        return commitExecutor;
-    }
-
     @Override
     public final String getIdentifier() {
         return name;
@@ -150,7 +140,6 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public void close() {
-        ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
         ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
 
         if(closeable != null) {
@@ -239,38 +228,36 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            return commitExecutor.submit(new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws TransactionCommitFailedException {
-                    try {
-                        dataTree.validate(modification);
-                        LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
-                        return true;
-                    } catch (ConflictingModificationAppliedException e) {
-                        LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
-                                e.getPath());
-                        transaction.warnDebugContext(LOG);
-                        throw new OptimisticLockFailedException("Optimistic lock failed.",e);
-                    } catch (DataValidationFailedException e) {
-                        LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
-                                e.getPath(), e);
-                        transaction.warnDebugContext(LOG);
-                        throw new TransactionCommitFailedException("Data did not pass validation.",e);
-                    }
-                }
-            });
+            try {
+                dataTree.validate(modification);
+                LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
+                return CAN_COMMIT_FUTURE;
+            } catch (ConflictingModificationAppliedException e) {
+                LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
+                        e.getPath());
+                transaction.warnDebugContext(LOG);
+                return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+            } catch (DataValidationFailedException e) {
+                LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+                        e.getPath(), e);
+                transaction.warnDebugContext(LOG);
+                return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+            } catch (Exception e) {
+                LOG.warn("Unexpected failure in validation phase", e);
+                return Futures.immediateFailedFuture(e);
+            }
         }
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            return commitExecutor.submit(new Callable<Void>() {
-                @Override
-                public Void call() {
-                    candidate = dataTree.prepare(modification);
-                    listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
-                    return null;
-                }
-            });
+            try {
+                candidate = dataTree.prepare(modification);
+                listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+                return SUCCESSFUL_FUTURE;
+            } catch (Exception e) {
+                LOG.warn("Unexpected failure in pre-commit phase", e);
+                return Futures.immediateFailedFuture(e);
+            }
         }
 
         @Override
index 2ee8e18..00af3df 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
@@ -73,9 +71,7 @@ public final class InMemoryDOMDataStoreFactory {
         ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
                 dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
 
-        final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
-        final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
-            commitExecutor, dataChangeListenerExecutor,
+        final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, dataChangeListenerExecutor,
                 actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
 
         if (schemaService != null) {
index e00be24..cb91b4c 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
 
-import java.util.concurrent.ExecutorService;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 
 /**
@@ -22,11 +22,9 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 public class InMemoryDataStoreStats implements AutoCloseable {
 
     private final AbstractMXBean notificationExecutorStatsBean;
-    private final AbstractMXBean dataStoreExecutorStatsBean;
     private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
-    public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
-            final ExecutorService dataStoreExecutor) {
+    public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager) {
 
         notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
                 "notification-manager", mBeanType, null);
@@ -37,12 +35,10 @@ public class InMemoryDataStoreStats implements AutoCloseable {
         if (notificationExecutorStatsBean != null) {
             notificationExecutorStatsBean.registerMBean();
         }
+    }
 
-        dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
-                "data-store-executor", mBeanType, null);
-        if (dataStoreExecutorStatsBean != null) {
-            dataStoreExecutorStatsBean.registerMBean();
-        }
+    public InMemoryDataStoreStats(final String name, final InMemoryDOMDataStore dataStore) {
+        this(name, dataStore.getDataChangeListenerNotificationManager());
     }
 
     @Override
@@ -51,10 +47,6 @@ public class InMemoryDataStoreStats implements AutoCloseable {
             notificationExecutorStatsBean.unregisterMBean();
         }
 
-        if(dataStoreExecutorStatsBean != null) {
-            dataStoreExecutorStatsBean.unregisterMBean();
-        }
-
         if(notificationManagerStatsBean != null) {
             notificationManagerStatsBean.unregisterMBean();
         }
index 0e064cd..8d32962 100644 (file)
@@ -7,11 +7,8 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
 import java.util.Collection;
 import java.util.Map;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -63,8 +60,7 @@ public abstract class AbstractDataChangeListenerTest {
         dclExecutorService = new TestDCLExecutorService(
                 SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
 
-        datastore = new InMemoryDOMDataStore("TEST",
-                MoreExecutors.sameThreadExecutor(), dclExecutorService );
+        datastore = new InMemoryDOMDataStore("TEST", dclExecutorService);
         datastore.onGlobalContextUpdated(schemaContext);
     }
 
index 04e1949..4720f4b 100644 (file)
@@ -11,14 +11,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import java.util.concurrent.ExecutionException;
-
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -48,8 +45,7 @@ public class InMemoryDataStoreTest {
 
     @Before
     public void setupStore() {
-        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
-                MoreExecutors.sameThreadExecutor());
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
         domStore.onGlobalContextUpdated(schemaContext);
     }
index 364712c..15e5f71 100644 (file)
@@ -8,9 +8,9 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static org.junit.Assert.assertNotNull;
-
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutionException;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -23,9 +23,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-
 public class SchemaUpdateForTransactionTest {
 
     private static final YangInstanceIdentifier TOP_PATH = YangInstanceIdentifier.of(Top.QNAME);
@@ -34,8 +31,7 @@ public class SchemaUpdateForTransactionTest {
 
     @Before
     public void setupStore() {
-        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
-                MoreExecutors.sameThreadExecutor());
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
         loadSchemas(RockTheHouseInput.class);
     }
 
index 36ab41f..3070499 100644 (file)
@@ -56,15 +56,13 @@ public class DataBrokerTestCustomizer {
     }
 
     public DOMStore createConfigurationDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
 
     public DOMStore createOperationalDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
-                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.