}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MutableClassToInstanceMap;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
@Beta
public class BindingTestContext implements AutoCloseable {
public void startNewDomDataBroker() {
checkState(executor != null, "Executor needs to be set");
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor);
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor);
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
+ MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
+ MoreExecutors.sameThreadExecutor());
newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore)
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-
/**
*
*/
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
- private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
+ private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.dist-datastore-executor-pool.size";
+ private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
+
+ private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.dist-datastore-executor-queue.size";
+ private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
private final String type;
private final ActorContext actorContext;
private SchemaContext schemaContext;
-
-
/**
* Executor used to run FutureTask's
*
* This is typically used when we need to make a request to an actor and
* wait for it's response and the consumer needs to be provided a Future.
- *
- * FIXME : Make the thread pool size configurable.
*/
private final ListeningExecutorService executor =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
+ MoreExecutors.listeningDecorator(
+ SpecialExecutors.newBlockingBoundedFastThreadPool(
+ PropertyUtils.getIntSystemProperty(
+ EXECUTOR_MAX_POOL_SIZE_PROP,
+ DEFAULT_EXECUTOR_MAX_POOL_SIZE),
+ PropertyUtils.getIntSystemProperty(
+ EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
this(new ActorContext(actorSystem, actorSystem
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
public static final String DEFAULT_NAME = "default";
- private final ListeningExecutorService storeExecutor =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
-
private final InMemoryDOMDataStore store;
private final Map<Object, DOMStoreThreePhaseCommitCohort>
LOG.info("Creating shard : {} persistent : {}", name, persistent);
- store = new InMemoryDOMDataStore(name, storeExecutor);
+ store = InMemoryDOMDataStoreFactory.create(name, null);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
public class DataChangeListenerRegistrationTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
static {
store.onGlobalContextUpdated(TestModel.createTestContext());
final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
return "match";
private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
static {
store.onGlobalContextUpdated(TestModel.createTestContext());
final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
return CreateTransactionReply.fromSerializable(in).getTransactionPath();
final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseTransactionChain().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
return "match";
MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", storeExecutor);
+ new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
final Class<? extends Modification> modificationType) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject
.tell(new ShardTransaction.GetCompositedModification(),
final CompositeModification compositeModification =
new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected CompositeModification match(Object in) {
if (in instanceof ShardTransaction.GetCompositeModificationReply) {
return ((ShardTransaction.GetCompositeModificationReply) in)
getSystem().actorOf(props, "testWriteData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new WriteData(TestModel.TEST_PATH,
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testMergeData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new MergeData(TestModel.TEST_PATH,
final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testDeleteData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testReadyTransaction");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new ReadyTransaction().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
return "match";
watch(subject);
new Within(duration("2 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseTransaction().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
return "match";
final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof Terminated) {
return "match";
@Before
public void setUp(){
- store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
+ store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
store.onGlobalContextUpdated(TestModel.createTestContext());
}
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-import java.util.concurrent.Executors;
-
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
/**
*
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
+ private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-future-callback-queue.size";
+ private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+ private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.datastore-future-callback-pool.size";
+ private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20;
+ private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-commit-queue.size";
+ private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
@Override
public java.lang.AutoCloseable createInstance() {
- ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
//Initializing Operational DOM DataStore defaulting to InMemoryDOMDataStore if one is not configured
DOMStore operStore = getOperationalDataStoreDependency();
if(operStore == null){
//we will default to InMemoryDOMDataStore creation
- operStore = new InMemoryDOMDataStore("DOM-OPER", storeExecutor);
- //here we will register the SchemaContext listener
- getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)operStore);
+ operStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency());
}
DOMStore configStore = getConfigDataStoreDependency();
if(configStore == null){
//we will default to InMemoryDOMDataStore creation
- configStore = new InMemoryDOMDataStore("DOM-CFG", storeExecutor);
- //here we will register the SchemaContext listener
- getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)configStore);
+ configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
}
ImmutableMap<LogicalDatastoreType, DOMStore> datastores = ImmutableMap
.<LogicalDatastoreType, DOMStore> builder().put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore).build();
+ /*
+ * We use a single-threaded executor for commits with a bounded queue capacity. If the
+ * queue capacity is reached, subsequent commit tasks will be rejected and the commits will
+ * fail. This is done to relieve back pressure. This should be an extreme scenario - either
+ * there's deadlock(s) somewhere and the controller is unstable or some rogue component is
+ * continuously hammering commits too fast or the controller is just over-capacity for the
+ * system it's running on.
+ */
+ ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+ PropertyUtils.getIntSystemProperty(
+ COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit");
+
+ /*
+ * We use an executor for commit ListenableFuture callbacks that favors reusing available
+ * threads over creating new threads at the expense of execution time. The assumption is
+ * that most ListenableFuture callbacks won't execute a lot of business logic where we want
+ * it to run quicker - many callbacks will likely just handle error conditions and do
+ * nothing on success. The executor queue capacity is bounded and, if the capacity is
+ * reached, subsequent submitted tasks will block the caller.
+ */
+ Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
+ PropertyUtils.getIntSystemProperty(
+ FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
+ DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
+ PropertyUtils.getIntSystemProperty(
+ FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
+
DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
- new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION));
+ new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
+ listenableFutureExecutor));
return newDataBroker;
}
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
import javax.annotation.concurrent.GuardedBy;
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
- transaction, cohorts, listener));
+
+ ListenableFuture<Void> commitFuture = null;
+ try {
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+ } catch(RejectedExecutionException e) {
+ LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+ executor, e);
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException(
+ "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+ }
+
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
private SchemaContext schemaContext;
private DOMDataBrokerImpl domBroker;
private ListeningExecutorService executor;
+ private ExecutorService futureExecutor;
+ private CommitExecutorService commitExecutor;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
.put(OPERATIONAL, operStore) //
.build();
- executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
+ commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+ futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+ executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
domBroker = new DOMDataBrokerImpl(stores, executor);
}
if( executor != null ) {
executor.shutdownNow();
}
+
+ if(futureExecutor != null) {
+ futureExecutor.shutdownNow();
+ }
}
@Test(timeout=10000)
assertTrue(afterCommitRead.isPresent());
}
+ @Test(expected=TransactionCommitFailedException.class)
+ public void testRejectedCommit() throws Exception {
+
+ commitExecutor.delegate = Mockito.mock( ExecutorService.class );
+ Mockito.doThrow( new RejectedExecutionException( "mock" ) )
+ .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
+ Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
+ Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
+ Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
+ Mockito.doReturn( true ).when( commitExecutor.delegate )
+ .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+
+ writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+ }
+
/**
* Tests a simple DataChangeListener notification after a write.
*/
assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
}
}
+
+ static class CommitExecutorService extends ForwardingExecutorService {
+
+ ExecutorService delegate;
+
+ public CommitExecutorService( ExecutorService delegate ) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return delegate;
+ }
+ }
}
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
+
public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
- getSchemaServiceDependency().registerSchemaContextListener(ids);
- return ids;
+ return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
+
public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
- getOperationalSchemaServiceDependency().registerSchemaContextListener(ids);
- return ids;
+ return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency());
}
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
class ChangeListenerNotifyTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
+
private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+ notificationMgr;
+
+ @SuppressWarnings("rawtypes")
public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event) {
+ final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event,
+ final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
this.listeners = listeners;
this.event = event;
+ this.notificationMgr = notificationMgr;
}
@Override
public void run() {
for (DataChangeListenerRegistration<?> listener : listeners) {
- try {
- listener.getInstance().onDataChanged(event);
- } catch (Exception e) {
- LOG.error("Unhandled exception during invoking listener {} with event {}", listener, event, e);
- }
+ notificationMgr.submitNotification(listener.getInstance(), event);
}
-
}
@Override
public String toString() {
return "ChangeListenerNotifyTask [listeners=" + listeners + ", event=" + event + "]";
}
-
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+
import java.util.Collections;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Preconditions.checkState;
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
TransactionReadyPrototype,AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+ @SuppressWarnings("rawtypes")
+ private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+ new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent>() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void invokeListener( AsyncDataChangeListener listener,
+ AsyncDataChangeEvent notification ) {
+ listener.onDataChanged(notification);
+ }
+ };
+
+ private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-queue.size";
+
+ private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
+
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListeningExecutorService executor;
+ private final ListeningExecutorService listeningExecutor;
+
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+ dataChangeListenerNotificationManager;
+ private final ExecutorService dataChangeListenerExecutor;
private final String name;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ final ExecutorService dataChangeListenerExecutor) {
this.name = Preconditions.checkNotNull(name);
- this.executor = Preconditions.checkNotNull(executor);
+ this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+
+ this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+
+ int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
+ DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
+
+ dataChangeListenerNotificationManager =
+ new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
+ DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
}
@Override
}
@Override
- public void close(){
- executor.shutdownNow();
+ public void close() {
+ ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+ ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
}
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
.setAfter(data) //
.addCreated(path, data) //
.build();
- executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+
+ new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+ dataChangeListenerNotificationManager).run();
}
}
@Override
public void close() {
- executor.shutdownNow();
-
+ // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
+ // by the outer class.
+ //listeningExecutor.shutdownNow();
}
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
@Override
public ListenableFuture<Boolean> canCommit() {
- return executor.submit(new Callable<Boolean>() {
+ return listeningExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws TransactionCommitFailedException {
try {
@Override
public ListenableFuture<Void> preCommit() {
- return executor.submit(new Callable<Void>() {
+ return listeningExecutor.submit(new Callable<Void>() {
@Override
public Void call() {
candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
+ dataChangeListenerNotificationManager);
return null;
}
});
for (ChangeListenerNotifyTask task : listenerResolver.call()) {
LOG.trace("Scheduling invocation of listeners: {}", task);
- executor.submit(task);
+ task.run();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A factory for creating InMemoryDOMDataStore instances.
+ *
+ * @author Thomas Pantelis
+ */
+public final class InMemoryDOMDataStoreFactory {
+
+ private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-queue.size";
+ private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+ private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-pool.size";
+ private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20;
+
+ private InMemoryDOMDataStoreFactory() {
+ }
+
+ /**
+ * Creates an InMemoryDOMDataStore instance.
+ *
+ * @param name the name of the data store
+ * @param schemaService the SchemaService to which to register the data store.
+ * @return an InMemoryDOMDataStore instance
+ */
+ public static InMemoryDOMDataStore create(final String name,
+ @Nullable final SchemaService schemaService) {
+
+ // For DataChangeListener notifications we use an executor that provides the fastest
+ // task execution time to get higher throughput as DataChangeListeners typically provide
+ // much of the business logic for a data model. If the executor queue size limit is reached,
+ // subsequent submitted notifications will block the calling thread.
+
+ int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty(
+ DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE);
+ int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty(
+ DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE);
+
+ ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
+ dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
+
+ InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
+ dataChangeListenerExecutor);
+
+ if(schemaService != null) {
+ schemaService.registerSchemaContextListener(dataStore);
+ }
+
+ return dataStore;
+ }
+}
import java.util.Set;
import java.util.concurrent.Callable;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
private final DataTreeCandidate candidate;
private final ListenerTree listenerRoot;
- public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
+
+ @SuppressWarnings("rawtypes")
+ public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree,
+ final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr) {
this.candidate = Preconditions.checkNotNull(candidate);
this.listenerRoot = Preconditions.checkNotNull(listenerTree);
+ this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
}
/**
* @param listeners
* @param entries
*/
- private static void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
+ private void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
final ListenerTree.Node listeners, final Collection<DOMImmutableDataChangeEvent> entries) {
if (!entries.isEmpty()) {
* @param listeners
* @param event
*/
- private static void addNotificationTaskByScope(
+ private void addNotificationTaskByScope(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
final DOMImmutableDataChangeEvent event) {
DataChangeScope eventScope = event.getScope();
List<DataChangeListenerRegistration<?>> listenerSet = Collections
.<DataChangeListenerRegistration<?>> singletonList(listenerReg);
if (eventScope == DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
} else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
} else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
}
}
}
* @param listeners
* @param entries
*/
- private static void addNotificationTasksAndMergeEvents(
+ private void addNotificationTasksAndMergeEvents(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
final Collection<DOMImmutableDataChangeEvent> entries) {
}
}
- private static void addNotificationTaskExclusively(
+ private void addNotificationTaskExclusively(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final Node listeners,
final DOMImmutableDataChangeEvent event) {
for (DataChangeListenerRegistration<?> listener : listeners.getListeners()) {
if (listener.getScope() == event.getScope()) {
Set<DataChangeListenerRegistration<?>> listenerSet = Collections
.<DataChangeListenerRegistration<?>> singleton(listener);
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
}
}
}
}
}
- public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
- return new ResolveDataChangeEventsTask(candidate, listenerTree);
+ @SuppressWarnings("rawtypes")
+ public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate,
+ final ListenerTree listenerTree,
+ final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
+ return new ResolveDataChangeEventsTask(candidate, listenerTree, notificationMgr);
}
}
import java.util.Collection;
import java.util.Map;
-
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.controller.md.sal.dom.store.impl.DatastoreTestTask.WriteTransactionCustomizer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.top.level.list.NestedList;
import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
import org.opendaylight.yangtools.yang.common.QName;
private InMemoryDOMDataStore datastore;
private SchemaContext schemaContext;
+ private TestDCLExecutorService dclExecutorService;
@Before
public final void setup() throws Exception {
ModuleInfoBackedContext context = ModuleInfoBackedContext.create();
context.registerModuleInfo(moduleInfo);
schemaContext = context.tryToCreateSchemaContext().get();
+
+ dclExecutorService = new TestDCLExecutorService(
+ SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
+
datastore = new InMemoryDOMDataStore("TEST",
- MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor(), dclExecutorService );
datastore.onGlobalContextUpdated(schemaContext);
}
+ @After
+ public void tearDown() {
+ if( dclExecutorService != null ) {
+ dclExecutorService.shutdownNow();
+ }
+ }
+
public final DatastoreTestTask newTestTask() {
- return new DatastoreTestTask(datastore).cleanup(DatastoreTestTask
+ return new DatastoreTestTask(datastore, dclExecutorService).cleanup(DatastoreTestTask
.simpleDelete(TOP_LEVEL));
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
private WriteTransactionCustomizer cleanup;
private YangInstanceIdentifier changePath;
private DataChangeScope changeScope;
- private boolean postSetup = false;
+ private volatile boolean postSetup = false;
private final ChangeEventListener internalListener;
+ private final TestDCLExecutorService dclExecutorService;
- public DatastoreTestTask(final DOMStore datastore) {
+ public DatastoreTestTask(final DOMStore datastore, final TestDCLExecutorService dclExecutorService) {
this.store = datastore;
+ this.dclExecutorService = dclExecutorService;
internalListener = new ChangeEventListener();
}
return this;
}
- public void run() throws InterruptedException, ExecutionException {
+ public void run() throws InterruptedException, ExecutionException, TimeoutException {
if (setup != null) {
execute(setup);
}
}
Preconditions.checkState(write != null, "Write Transaction must be set.");
+
postSetup = true;
+ dclExecutorService.afterTestSetup();
+
execute(write);
if (registration != null) {
registration.close();
}
+
if (changeListener != null) {
- changeListener.onDataChanged(internalListener.receivedChange.get());
+ changeListener.onDataChanged(getChangeEvent());
}
if (read != null) {
read.verify(store.newReadOnlyTransaction());
}
}
- public Future<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> getChangeEvent() {
- return internalListener.receivedChange;
+ public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChangeEvent() {
+ try {
+ return internalListener.receivedChange.get(10, TimeUnit.SECONDS);
+ } catch( Exception e ) {
+ fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+ }
+
+ // won't get here
+ return null;
+ }
+
+ public void verifyNoChangeEvent() {
+ try {
+ Object unexpected = internalListener.receivedChange.get(500, TimeUnit.MILLISECONDS);
+ fail( "Got unexpected AsyncDataChangeEvent from the Future: " + unexpected );
+ } catch( TimeoutException e ) {
+ // Expected
+ } catch( Exception e ) {
+ fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+ }
}
private void execute(final WriteTransactionCustomizer writeCustomizer) throws InterruptedException,
abstract protected void customizeTask(DatastoreTestTask task);
@Test
- public final void putTopLevelOneNested() throws InterruptedException, ExecutionException {
+ public final void putTopLevelOneNested() throws Exception {
DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR));
customizeTask(task);
}
@Test
- public final void existingTopWriteSibling() throws InterruptedException, ExecutionException {
+ public final void existingTopWriteSibling() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
new WriteTransactionCustomizer() {
@Override
@Test
- public final void existingTopWriteTwoNested() throws InterruptedException, ExecutionException {
+ public final void existingTopWriteTwoNested() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
new WriteTransactionCustomizer() {
@Override
@Test
- public final void existingOneNestedWriteAdditionalNested() throws InterruptedException, ExecutionException {
+ public final void existingOneNestedWriteAdditionalNested() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
new WriteTransactionCustomizer() {
@Override
protected abstract void existingOneNestedWriteAdditionalNested(DatastoreTestTask task) throws InterruptedException, ExecutionException;
- protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws InterruptedException,
- ExecutionException;
+ protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws Exception;
@Test
- public final void replaceTopLevelNestedChanged() throws InterruptedException, ExecutionException {
+ public final void replaceTopLevelNestedChanged() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
writeOneTopMultipleNested(FOO, BAZ));
customizeTask(task);
ExecutionException;
@Test
- public final void putTopLevelWithTwoNested() throws InterruptedException, ExecutionException {
+ public final void putTopLevelWithTwoNested() throws Exception {
DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR, BAZ));
customizeTask(task);
ExecutionException;
@Test
- public final void twoNestedExistsOneIsDeleted() throws InterruptedException, ExecutionException {
+ public final void twoNestedExistsOneIsDeleted() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR, BAZ)).test(
deleteNested(FOO, BAZ));
ExecutionException;
@Test
- public final void nestedListExistsRootDeleted() throws InterruptedException, ExecutionException {
+ public final void nestedListExistsRootDeleted() throws Exception {
DatastoreTestTask task = newTestTask().cleanup(null).setup(writeOneTopMultipleNested(FOO, BAR, BAZ))
.test(DatastoreTestTask.simpleDelete(TOP_LEVEL));
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
}
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR));
assertEmpty(change.getUpdatedData());
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO, BAZ));
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR), path(FOO, BAZ));
assertEmpty(change.getUpdatedData());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
protected void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAZ));
assertNotContains(change.getCreatedData(), path(FOO,BAR));
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertContains(change.getUpdatedData(), TOP_LEVEL);
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
loadSchemas(RockTheHouseInput.class);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A forwarding Executor used by unit tests for DataChangeListener notifications
+ *
+ * @author Thomas Pantelis
+ */
+public class TestDCLExecutorService extends ForwardingExecutorService {
+
+ // Start with a same thread executor to avoid timing issues during test setup.
+ private volatile ExecutorService currentExecutor = MoreExecutors.sameThreadExecutor();
+
+ // The real executor to use when test setup is complete.
+ private final ExecutorService postSetupExecutor;
+
+
+ public TestDCLExecutorService( ExecutorService postSetupExecutor ) {
+ this.postSetupExecutor = postSetupExecutor;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return currentExecutor;
+ }
+
+ public void afterTestSetup() {
+ // Test setup complete - switch to the real executor.
+ currentExecutor = postSetupExecutor;
+ }
+}
\ No newline at end of file
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* Base listener should be notified only and only if actual node changed its state,
* since deletion of child, did not result in change of node we are listening
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(),path(FOO), TOP_LEVEL);
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotContains(change.getCreatedData(), TOP_LEVEL);
assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR));
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertTrue(change.getCreatedData().isEmpty());
assertContains(change.getUpdatedData(), path(FOO));
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAZ));
assertNotContains(change.getCreatedData(), path(FOO,BAR));
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
assertContains(change.getUpdatedData(), path(FOO));
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);