package org.opendaylight.controller.md.sal.dom.broker.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMBrokerTest {
private SchemaContext schemaContext;
- private DOMDataBrokerImpl domBroker;
+ private AbstractDOMDataBroker domBroker;
private ListeningExecutorService executor;
+ private ExecutorService futureExecutor;
+ private CommitExecutorService commitExecutor;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
.put(OPERATIONAL, operStore) //
.build();
- executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
- domBroker = new DOMDataBrokerImpl(stores, executor);
+ commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+ futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+ executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
}
@After
if( executor != null ) {
executor.shutdownNow();
}
+
+ if(futureExecutor != null) {
+ futureExecutor.shutdownNow();
+ }
}
@Test(timeout=10000)
assertTrue(afterCommitRead.isPresent());
}
+ @Test(expected=TransactionCommitFailedException.class)
+ public void testRejectedCommit() throws Exception {
+
+ commitExecutor.delegate = Mockito.mock( ExecutorService.class );
+ Mockito.doThrow( new RejectedExecutionException( "mock" ) )
+ .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
+ Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
+ Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
+ Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
+ Mockito.doReturn( true ).when( commitExecutor.delegate )
+ .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+
+ writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+ }
+
/**
* Tests a simple DataChangeListener notification after a write.
*/
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( Void result ) {
+ public void onSuccess( final Void result ) {
commitCompletedLatch.countDown();
}
@Override
- public void onFailure( Throwable t ) {
+ public void onFailure( final Throwable t ) {
caughtCommitEx.set( t );
commitCompletedLatch.countDown();
}
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
private final CountDownLatch latch = new CountDownLatch( 1 );
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
this.change = change;
latch.countDown();
}
assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
}
}
+
+ static class CommitExecutorService extends ForwardingExecutorService {
+
+ ExecutorService delegate;
+
+ public CommitExecutorService( final ExecutorService delegate ) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return delegate;
+ }
+ }
}