Bug 1430: Off-load notifications from single commit thread
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
index 0bb16a39b90f7eb513093b18faa20815061fad3c..e57d08f1737fde07dc455eabfc53c2e5304cd53f 100644 (file)
@@ -7,19 +7,24 @@ import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -28,6 +33,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
@@ -35,6 +41,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -46,11 +53,16 @@ public class DOMBrokerTest {
     private SchemaContext schemaContext;
     private DOMDataBrokerImpl domBroker;
     private ListeningExecutorService executor;
+    private ExecutorService futureExecutor;
+    private CommitExecutorService commitExecutor;
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
@@ -61,8 +73,10 @@ public class DOMBrokerTest {
                 .put(OPERATIONAL, operStore) //
                 .build();
 
-        executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
-                                          TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
+        commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+        futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+        executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+                TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
         domBroker = new DOMDataBrokerImpl(stores, executor);
     }
 
@@ -71,6 +85,10 @@ public class DOMBrokerTest {
         if( executor != null ) {
             executor.shutdownNow();
         }
+
+        if(futureExecutor != null) {
+            futureExecutor.shutdownNow();
+        }
     }
 
     @Test(timeout=10000)
@@ -137,6 +155,24 @@ public class DOMBrokerTest {
         assertTrue(afterCommitRead.isPresent());
     }
 
+    @Test(expected=TransactionCommitFailedException.class)
+    public void testRejectedCommit() throws Exception {
+
+        commitExecutor.delegate = Mockito.mock( ExecutorService.class );
+        Mockito.doThrow( new RejectedExecutionException( "mock" ) )
+            .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
+        Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
+        Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
+        Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
+        Mockito.doReturn( true ).when( commitExecutor.delegate )
+            .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+
+        writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+    }
+
     /**
      * Tests a simple DataChangeListener notification after a write.
      */
@@ -306,4 +342,18 @@ public class DOMBrokerTest {
             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
         }
     }
+
+    static class CommitExecutorService extends ForwardingExecutorService {
+
+        ExecutorService delegate;
+
+        public CommitExecutorService( ExecutorService delegate ) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected ExecutorService delegate() {
+            return delegate;
+        }
+    }
 }