Deprecate all MD-SAL APIs
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
index 0bb16a39b90f7eb513093b18faa20815061fad3c..065035ffc31dae47012af3f4e4c152027c352584 100644 (file)
@@ -1,26 +1,40 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
 
-import java.util.concurrent.CountDownLatch;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -28,52 +42,57 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
+@Deprecated
 public class DOMBrokerTest {
 
     private SchemaContext schemaContext;
-    private DOMDataBrokerImpl domBroker;
+    private AbstractDOMDataBroker domBroker;
     private ListeningExecutorService executor;
+    private ExecutorService futureExecutor;
+    private CommitExecutorService commitExecutor;
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
         configStore.onGlobalContextUpdated(schemaContext);
 
-        ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+        final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
+                ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
                 .put(CONFIGURATION, configStore) //
                 .put(OPERATIONAL, operStore) //
                 .build();
 
-        executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
-                                          TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
-        domBroker = new DOMDataBrokerImpl(stores, executor);
+        commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+        futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
+        executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+                                                                 TransactionCommitDeadlockException
+                                                                         .DEADLOCK_EXCEPTION_SUPPLIER,
+                                                                 futureExecutor);
+        domBroker = new SerializedDOMDataBroker(stores, executor);
     }
 
     @After
     public void tearDown() {
-        if( executor != null ) {
+        if (executor != null) {
             executor.shutdownNow();
         }
+
+        if (futureExecutor != null) {
+            futureExecutor.shutdownNow();
+        }
     }
 
-    @Test(timeout=10000)
+    @Test(timeout = 10000)
     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
 
         assertNotNull(domBroker);
@@ -91,16 +110,14 @@ public class DOMBrokerTest {
         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         /**
-         *
          * Reads /test from writeTx Read should return container.
          *
          */
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
-                TestModel.TEST_PATH);
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
+                .read(OPERATIONAL, TestModel.TEST_PATH);
         assertTrue(writeTxContainer.get().isPresent());
 
         /**
-         *
          * Reads /test from readTx Read should return Absent.
          *
          */
@@ -109,8 +126,8 @@ public class DOMBrokerTest {
         assertFalse(readTxContainer.get().isPresent());
     }
 
-    @Test(timeout=10000)
-    public void testTransactionCommit() throws InterruptedException, ExecutionException {
+    @Test(timeout = 10000)
+    public void testTransactionCommit() throws InterruptedException, ExecutionException, TimeoutException {
 
         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
         assertNotNull(writeTx);
@@ -122,188 +139,97 @@ public class DOMBrokerTest {
         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         /**
-         *
          * Reads /test from writeTx Read should return container.
          *
          */
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
-                TestModel.TEST_PATH);
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
+                .read(OPERATIONAL, TestModel.TEST_PATH);
         assertTrue(writeTxContainer.get().isPresent());
 
-        writeTx.submit().get();
+        writeTx.commit().get(5, TimeUnit.SECONDS);
 
         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
         assertTrue(afterCommitRead.isPresent());
     }
 
-    /**
-     * Tests a simple DataChangeListener notification after a write.
-     */
-    @Test
-    public void testDataChangeListener() throws Throwable {
-
-        final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
-
-        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
-
-        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
-                                              dcListener, DataChangeScope.BASE );
-
-        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
-        assertNotNull( writeTx );
-
-        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
-
-        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
-
-        dcListener.waitForChange();
-
-        if( caughtEx.get() != null ) {
-            throw caughtEx.get();
-        }
-
-        NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
-        assertEquals( "Created node", testNode, actualNode );
-    }
-
-    /**
-     * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
-     * This should succeed without deadlock.
-     */
-    @Test
-    public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
-
-        final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
-        final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
-
-        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
-            @Override
-            public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
-
-                DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
-                writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
-                             ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
-                Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess( Void result ) {
-                        commitCompletedLatch.countDown();
-                    }
-
-                    @Override
-                    public void onFailure( Throwable t ) {
-                        caughtCommitEx.set( t );
-                        commitCompletedLatch.countDown();
-                    }
-                } );
-
-                super.onDataChanged( change );
-            }
-        };
-
-        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
-                                              dcListener, DataChangeScope.BASE );
+    @Test(timeout = 10000)
+    @Deprecated
+    public void testTransactionSubmit() throws InterruptedException, ExecutionException, TimeoutException {
 
-        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
-        assertNotNull( writeTx );
-
-        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
-
-        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
-
-        dcListener.waitForChange();
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        assertNotNull(writeTx);
+        /**
+         *
+         * Writes /test in writeTx
+         *
+         */
+        writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        if( caughtEx.get() != null ) {
-            throw caughtEx.get();
-        }
+        /**
+         * Reads /test from writeTx Read should return container.
+         *
+         */
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
+                .read(OPERATIONAL, TestModel.TEST_PATH);
+        assertTrue(writeTxContainer.get().isPresent());
 
-        assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
+        writeTx.submit().get(5, TimeUnit.SECONDS);
 
-        if( caughtCommitEx.get() != null ) {
-            throw caughtCommitEx.get();
-        }
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
+                .read(OPERATIONAL, TestModel.TEST_PATH).get();
+        assertTrue(afterCommitRead.isPresent());
     }
 
-    /**
-     * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
-     * This should throw an exception and not deadlock.
-     */
-    @Test(expected=TransactionCommitDeadlockException.class)
-    public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
-
-        final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
-
-        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
-            @Override
-            public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
-                DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
-                writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
-                             ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
-                try {
-                    writeTx.submit().get();
-                } catch( ExecutionException e ) {
-                    caughtCommitEx.set( e.getCause() );
-                } catch( Exception e ) {
-                    caughtCommitEx.set( e );
-                }
-                finally {
-                    super.onDataChanged( change );
-                }
-            }
-        };
-
-        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
-                                              dcListener, DataChangeScope.BASE );
-
-        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
-        assertNotNull( writeTx );
-
-        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+    @Test(expected = TransactionCommitFailedException.class)
+    @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
+    public void testRejectedCommit() throws Throwable {
 
-        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+        commitExecutor.delegate = Mockito.mock(ExecutorService.class);
+        Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
+                .execute(Mockito.any(Runnable.class));
+        Mockito.doNothing().when(commitExecutor.delegate).shutdown();
+        Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
+        Mockito.doReturn("").when(commitExecutor.delegate).toString();
+        Mockito.doReturn(true).when(commitExecutor.delegate)
+                .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
 
-        dcListener.waitForChange();
-
-        if( caughtEx.get() != null ) {
-            throw caughtEx.get();
-        }
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        if( caughtCommitEx.get() != null ) {
-            throw caughtCommitEx.get();
+        try {
+            writeTx.commit().get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
         }
     }
 
-    AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
-        new Thread() {
-            @Override
-            public void run() {
-
-                try {
-                    writeTx.submit();
-                } catch( Throwable e ) {
-                    caughtEx.set( e );
-                }
+        new Thread(() -> {
+            try {
+                writeTx.commit();
+            } catch (Throwable e) {
+                caughtEx.set(e);
             }
-
-        }.start();
+        }).start();
 
         return caughtEx;
     }
 
-    static class TestDOMDataChangeListener implements DOMDataChangeListener {
 
-        volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
-        private final CountDownLatch latch = new CountDownLatch( 1 );
+    static class CommitExecutorService extends ForwardingExecutorService {
 
-        @Override
-        public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
-            this.change = change;
-            latch.countDown();
+        ExecutorService delegate;
+
+        CommitExecutorService(final ExecutorService delegate) {
+            this.delegate = delegate;
         }
 
-        void waitForChange() throws InterruptedException {
-            assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
+        @Override
+        protected ExecutorService delegate() {
+            return delegate;
         }
     }
 }