import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import java.util.function.Function;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
public class DatastoreTestTask {
private final DOMStore store;
- private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> changeListener;
private WriteTransactionCustomizer setup;
private WriteTransactionCustomizer write;
private ReadTransactionVerifier read;
private WriteTransactionCustomizer cleanup;
private YangInstanceIdentifier changePath;
- private DataChangeScope changeScope;
- private volatile boolean postSetup = false;
- private final ChangeEventListener internalListener;
+ private DOMStoreTreeChangePublisher storeTreeChangePublisher;
+ private ChangeEventListener internalListener;
private final TestDCLExecutorService dclExecutorService;
public DatastoreTestTask(final DOMStore datastore, final TestDCLExecutorService dclExecutorService) {
this.store = datastore;
this.dclExecutorService = dclExecutorService;
- internalListener = new ChangeEventListener();
}
- public DatastoreTestTask changeListener(final YangInstanceIdentifier path, final DataChangeScope scope,
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
- this.changeListener = listener;
+ @SafeVarargs
+ public final DatastoreTestTask changeListener(final YangInstanceIdentifier path,
+ Function<DataTreeCandidate, Boolean>... matchers) {
+ assertTrue(store instanceof DOMStoreTreeChangePublisher);
+ this.storeTreeChangePublisher = (DOMStoreTreeChangePublisher)store;
this.changePath = path;
- this.changeScope = scope;
+ this.internalListener = new ChangeEventListener(matchers);
return this;
}
- public DatastoreTestTask changeListener(final YangInstanceIdentifier path, final DataChangeScope scope) {
- this.changePath = path;
- this.changeScope = scope;
- return this;
+ public static Function<DataTreeCandidate, Boolean> added(YangInstanceIdentifier path) {
+ return candidate -> candidate.getRootNode().getModificationType() == ModificationType.WRITE
+ && path.equals(candidate.getRootPath()) && !candidate.getRootNode().getDataBefore().isPresent()
+ && candidate.getRootNode().getDataAfter().isPresent();
+ }
+
+ public static Function<DataTreeCandidate, Boolean> replaced(YangInstanceIdentifier path) {
+ return candidate -> candidate.getRootNode().getModificationType() == ModificationType.WRITE
+ && path.equals(candidate.getRootPath()) && candidate.getRootNode().getDataBefore().isPresent()
+ && candidate.getRootNode().getDataAfter().isPresent();
+ }
+
+ public static Function<DataTreeCandidate, Boolean> deleted(YangInstanceIdentifier path) {
+ return candidate -> candidate.getRootNode().getModificationType() == ModificationType.DELETE
+ && path.equals(candidate.getRootPath()) && candidate.getRootNode().getDataBefore().isPresent()
+ && !candidate.getRootNode().getDataAfter().isPresent();
+ }
+
+ public static Function<DataTreeCandidate, Boolean> subtreeModified(YangInstanceIdentifier path) {
+ return candidate -> candidate.getRootNode().getModificationType() == ModificationType.SUBTREE_MODIFIED
+ && path.equals(candidate.getRootPath()) && candidate.getRootNode().getDataBefore().isPresent()
+ && candidate.getRootNode().getDataAfter().isPresent();
}
public DatastoreTestTask setup(final WriteTransactionCustomizer customizer) {
}
ListenerRegistration<ChangeEventListener> registration = null;
if (changePath != null) {
- registration = store.registerChangeListener(changePath, internalListener, changeScope);
+ registration = storeTreeChangePublisher.registerTreeChangeListener(changePath, internalListener);
}
Preconditions.checkState(write != null, "Write Transaction must be set.");
- postSetup = true;
dclExecutorService.afterTestSetup();
execute(write);
registration.close();
}
- if (changeListener != null) {
- changeListener.onDataChanged(getChangeEvent());
- }
if (read != null) {
read.verify(store.newReadOnlyTransaction());
}
}
}
- public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChangeEvent() throws Exception {
- return internalListener.receivedChange.get(10, TimeUnit.SECONDS);
+ public void verifyChangeEvents() {
+ internalListener.verifyChangeEvents();
}
- public void verifyNoChangeEvent() throws Exception {
- try {
- Object unexpected = internalListener.receivedChange.get(500, TimeUnit.MILLISECONDS);
- fail("Got unexpected AsyncDataChangeEvent from the Future: " + unexpected);
- } catch (TimeoutException e) {
- // Expected
- }
+ public void verifyNoChangeEvent() {
+ internalListener.verifyNoChangeEvent();
}
private void execute(final WriteTransactionCustomizer writeCustomizer) throws InterruptedException,
void verify(DOMStoreReadTransaction tx);
}
- private final class ChangeEventListener implements
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+ private final class ChangeEventListener implements DOMDataTreeChangeListener {
+
+ final SettableFuture<Collection<DataTreeCandidate>> future = SettableFuture.create();
+ final Collection<DataTreeCandidate> accumulatedChanges = new ArrayList<>();
+ final Function<DataTreeCandidate, Boolean>[] matchers;
+ final int expChangeCount;
- protected final SettableFuture<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- receivedChange = SettableFuture.create();
+ ChangeEventListener(Function<DataTreeCandidate, Boolean>[] matchers) {
+ this.expChangeCount = matchers.length;
+ this.matchers = matchers;
+ }
+
+ Collection<DataTreeCandidate> changes() {
+ try {
+ Collection<DataTreeCandidate> changes = internalListener.future.get(10, TimeUnit.SECONDS);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ return changes;
+ } catch (TimeoutException e) {
+ throw new AssertionError(String.format(
+ "Data tree change notifications not received. Expected: %s. Actual: %s - %s",
+ expChangeCount, accumulatedChanges.size(), accumulatedChanges), e);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new AssertionError("Data tree change notifications failed", e);
+ }
+ }
+
+ void verifyChangeEvents() {
+ Collection<DataTreeCandidate> changes = new ArrayList<>(changes());
+ Iterator<DataTreeCandidate> iter = changes.iterator();
+ while (iter.hasNext()) {
+ DataTreeCandidate dataTreeModification = iter.next();
+ for (Function<DataTreeCandidate, Boolean> matcher: matchers) {
+ if (matcher.apply(dataTreeModification)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+
+ if (!changes.isEmpty()) {
+ DataTreeCandidate mod = changes.iterator().next();
+ fail(String.format("Received unexpected notification: type: %s, path: %s, before: %s, after: %s",
+ mod.getRootNode().getModificationType(), mod.getRootPath(),
+ mod.getRootNode().getDataBefore(), mod.getRootNode().getDataAfter()));
+ }
+ }
+
+ void verifyNoChangeEvent() {
+ try {
+ Object unexpected = internalListener.future.get(500, TimeUnit.MILLISECONDS);
+ fail("Got unexpected Data tree change notifications: " + unexpected);
+ } catch (TimeoutException e) {
+ // Expected
+ } catch (InterruptedException | ExecutionException e) {
+ throw new AssertionError("Data tree change notifications failed", e);
+ }
+ }
@Override
- public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- if (postSetup) {
- receivedChange.set(change);
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ synchronized (accumulatedChanges) {
+ accumulatedChanges.addAll(changes);
+ if (expChangeCount == accumulatedChanges.size()) {
+ future.set(new ArrayList<>(accumulatedChanges));
+ }
}
}
}