+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.md.sal.dom.broker.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMBrokerTest {
private SchemaContext schemaContext;
- private DOMDataBrokerImpl domBroker;
+ private AbstractDOMDataBroker domBroker;
private ListeningExecutorService executor;
private ExecutorService futureExecutor;
private CommitExecutorService commitExecutor;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
configStore.onGlobalContextUpdated(schemaContext);
- ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+ final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
.put(CONFIGURATION, configStore) //
.put(OPERATIONAL, operStore) //
.build();
commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
- domBroker = new DOMDataBrokerImpl(stores, executor);
+ TransactionCommitDeadlockException
+ .DEADLOCK_EXCEPTION_SUPPLIER,
+ futureExecutor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
}
@After
public void tearDown() {
- if( executor != null ) {
+ if (executor != null) {
executor.shutdownNow();
}
- if(futureExecutor != null) {
+ if (futureExecutor != null) {
futureExecutor.shutdownNow();
}
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testTransactionIsolation() throws InterruptedException, ExecutionException {
assertNotNull(domBroker);
writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
/**
- *
* Reads /test from writeTx Read should return container.
*
*/
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
- TestModel.TEST_PATH);
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
+ .read(OPERATIONAL, TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
/**
- *
* Reads /test from readTx Read should return Absent.
*
*/
assertFalse(readTxContainer.get().isPresent());
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testTransactionCommit() throws InterruptedException, ExecutionException {
DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
/**
- *
* Reads /test from writeTx Read should return container.
*
*/
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
- TestModel.TEST_PATH);
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
+ .read(OPERATIONAL, TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
writeTx.submit().get();
assertTrue(afterCommitRead.isPresent());
}
- @Test(expected=TransactionCommitFailedException.class)
+ @Test(expected = TransactionCommitFailedException.class)
public void testRejectedCommit() throws Exception {
- commitExecutor.delegate = Mockito.mock( ExecutorService.class );
- Mockito.doThrow( new RejectedExecutionException( "mock" ) )
- .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
- Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
- Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
- Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
- Mockito.doReturn( true ).when( commitExecutor.delegate )
- .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+ commitExecutor.delegate = Mockito.mock(ExecutorService.class);
+ Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
+ .execute(Mockito.any(Runnable.class));
+ Mockito.doNothing().when(commitExecutor.delegate).shutdown();
+ Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
+ Mockito.doReturn("").when(commitExecutor.delegate).toString();
+ Mockito.doReturn(true).when(commitExecutor.delegate)
+ .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+ writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
}
/**
* Tests a simple DataChangeListener notification after a write.
*/
@Test
+ @SuppressWarnings("checkstyle:IllegalThrows")
public void testDataChangeListener() throws Throwable {
- final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
+ final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
+ domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
+ assertNotNull(writeTx);
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode);
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+ AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
dcListener.waitForChange();
- if( caughtEx.get() != null ) {
+ if (caughtEx.get() != null) {
throw caughtEx.get();
}
- NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
- assertEquals( "Created node", testNode, actualNode );
+ NormalizedNode<?, ?> actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH);
+ assertEquals("Created node", testNode, actualNode);
}
/**
* This should succeed without deadlock.
*/
@Test
+ @SuppressWarnings("checkstyle:IllegalThrows")
public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
- final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
+ final CountDownLatch commitCompletedLatch = new CountDownLatch(1);
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
- ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
- Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
+ writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
+ Futures.addCallback(writeTx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( Void result ) {
+ public void onSuccess(final Void result) {
commitCompletedLatch.countDown();
}
@Override
- public void onFailure( Throwable t ) {
- caughtCommitEx.set( t );
+ public void onFailure(final Throwable throwable) {
+ caughtCommitEx.set(throwable);
commitCompletedLatch.countDown();
}
- } );
+ });
- super.onDataChanged( change );
+ super.onDataChanged(change);
}
};
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
+ domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
+ assertNotNull(writeTx);
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+ AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
dcListener.waitForChange();
- if( caughtEx.get() != null ) {
+ if (caughtEx.get() != null) {
throw caughtEx.get();
}
- assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
+ assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS));
- if( caughtCommitEx.get() != null ) {
+ if (caughtCommitEx.get() != null) {
throw caughtCommitEx.get();
}
}
* Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
* This should throw an exception and not deadlock.
*/
- @Test(expected=TransactionCommitDeadlockException.class)
+ @Test(expected = TransactionCommitDeadlockException.class)
+ @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"})
public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
- ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
+ writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
try {
writeTx.submit().get();
- } catch( ExecutionException e ) {
- caughtCommitEx.set( e.getCause() );
- } catch( Exception e ) {
- caughtCommitEx.set( e );
- }
- finally {
- super.onDataChanged( change );
+ } catch (ExecutionException e) {
+ caughtCommitEx.set(e.getCause());
+ } catch (Exception e) {
+ caughtCommitEx.set(e);
+ } finally {
+ super.onDataChanged(change);
}
}
};
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
+ domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
+ assertNotNull(writeTx);
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+ AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
dcListener.waitForChange();
- if( caughtEx.get() != null ) {
+ if (caughtEx.get() != null) {
throw caughtEx.get();
}
- if( caughtCommitEx.get() != null ) {
+ if (caughtCommitEx.get() != null) {
throw caughtCommitEx.get();
}
}
- AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
- new Thread() {
- @Override
- public void run() {
-
- try {
- writeTx.submit();
- } catch( Throwable e ) {
- caughtEx.set( e );
- }
+ new Thread(() -> {
+ try {
+ writeTx.submit();
+ } catch (Throwable e) {
+ caughtEx.set(e);
}
-
- }.start();
+ }).start();
return caughtEx;
}
static class TestDOMDataChangeListener implements DOMDataChangeListener {
- volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
- private final CountDownLatch latch = new CountDownLatch( 1 );
+ volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> capturedChange;
+ private final CountDownLatch latch = new CountDownLatch(1);
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
- this.change = change;
+ public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ this.capturedChange = change;
latch.countDown();
}
void waitForChange() throws InterruptedException {
- assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
+ assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS));
}
}
ExecutorService delegate;
- public CommitExecutorService( ExecutorService delegate ) {
+ CommitExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}