*/
package org.opendaylight.mdsal.binding.dom.adapter.test;
+import static java.util.Objects.requireNonNull;
import static org.junit.Assert.fail;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
import org.opendaylight.mdsal.binding.api.DataTreeModification;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* @author Thomas Pantelis
*/
public class AbstractDataTreeChangeListenerTest extends AbstractConcurrentDataBrokerTest {
- protected static final class TestListener<T extends DataObject> implements DataTreeChangeListener<T> {
- private final SettableFuture<List<DataTreeModification<T>>> future = SettableFuture.create();
- private final List<DataTreeModification<T>> accumulatedChanges = new ArrayList<>();
- private final Function<DataTreeModification<T>, Boolean>[] matchers;
- private final int expChangeCount;
-
- private TestListener(final Function<DataTreeModification<T>, Boolean>[] matchers) {
- expChangeCount = matchers.length;
- this.matchers = matchers;
+ @FunctionalInterface
+ protected interface Matcher<T extends DataObject> {
+
+ boolean apply(DataTreeModification<T> modification);
+ }
+
+ @FunctionalInterface
+ protected interface DataMatcher<T extends DataObject> {
+
+ boolean apply(T data);
+ }
+
+ protected static final class ModificationCollector<T extends DataObject> implements AutoCloseable {
+ private final TestListener<T> listener;
+ private final Registration reg;
+
+ private ModificationCollector(final TestListener<T> listener, final Registration reg) {
+ this.listener = requireNonNull(listener);
+ this.reg = requireNonNull(reg);
}
- @Override
- public void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
- synchronized (accumulatedChanges) {
- accumulatedChanges.addAll(changes);
- if (expChangeCount <= accumulatedChanges.size()) {
- future.set(List.copyOf(accumulatedChanges));
+ @SafeVarargs
+ public final void verifyModifications(final Matcher<T>... inOrder) {
+ final var matchers = new ArrayDeque<>(Arrays.asList(inOrder));
+ final var changes = listener.awaitChanges(matchers.size());
+
+ while (!changes.isEmpty()) {
+ final var mod = changes.pop();
+ final var matcher = matchers.pop();
+ if (!matcher.apply(mod)) {
+ final var rootNode = mod.getRootNode();
+ fail("Received unexpected notification: type: %s, path: %s, before: %s, after: %s".formatted(
+ rootNode.modificationType(), mod.getRootPath().path(), rootNode.dataBefore(),
+ rootNode.dataAfter()));
+ return;
}
}
- }
- public List<DataTreeModification<T>> changes() {
- try {
- final var changes = future.get(5, TimeUnit.SECONDS);
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- return changes;
- } catch (InterruptedException | TimeoutException | ExecutionException e) {
- throw new AssertionError(String.format(
- "Data tree change notifications not received. Expected: %s. Actual: %s - %s",
- expChangeCount, accumulatedChanges.size(), accumulatedChanges), e);
+ final var count = listener.changeCount();
+ if (count != 0) {
+ throw new AssertionError("Expected no more changes, %s remain".formatted(count));
}
}
- public void verify() {
- final var changes = new ArrayList<>(changes());
- final var iter = changes.iterator();
- while (iter.hasNext()) {
- final var dataTreeModification = iter.next();
- for (var matcher : matchers) {
- if (matcher.apply(dataTreeModification)) {
- iter.remove();
- break;
+ @Override
+ public void close() {
+ reg.close();
+ }
+ }
+
+ private static final class TestListener<T extends DataObject> implements DataTreeChangeListener<T> {
+ private final Deque<DataTreeModification<T>> accumulatedChanges = new ArrayDeque<>();
+
+ private boolean synced;
+
+ @Override
+ public synchronized void onDataTreeChanged(final List<DataTreeModification<T>> changes) {
+ accumulatedChanges.addAll(changes);
+ synced = true;
+ }
+
+ @Override
+ public synchronized void onInitialData() {
+ synced = true;
+ }
+
+ void awaitSync() {
+ final var sw = Stopwatch.createStarted();
+
+ do {
+ synchronized (this) {
+ if (synced) {
+ return;
}
}
- }
- if (!changes.isEmpty()) {
- final var mod = changes.iterator().next();
- final var rootNode = mod.getRootNode();
- fail("Received unexpected notification: type: %s, path: %s, before: %s, after: %s".formatted(
- rootNode.getModificationType(), mod.getRootPath().getRootIdentifier(), rootNode.getDataBefore(),
- rootNode.getDataAfter()));
- }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ } while (sw.elapsed(TimeUnit.SECONDS) < 5);
+
+ throw new AssertionError("Failed to achieve initial sync");
}
- public boolean hasChanges() {
- synchronized (accumulatedChanges) {
- return !accumulatedChanges.isEmpty();
- }
+ Deque<DataTreeModification<T>> awaitChanges(final int expectedCount) {
+ final var ret = new ArrayDeque<DataTreeModification<T>>(expectedCount);
+ final var sw = Stopwatch.createStarted();
+ int remaining = expectedCount;
+
+ do {
+ synchronized (this) {
+ while (remaining != 0) {
+ final var change = accumulatedChanges.poll();
+ if (change == null) {
+ break;
+ }
+
+ remaining--;
+ ret.add(change);
+ }
+ }
+
+ if (remaining == 0) {
+ return ret;
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ } while (sw.elapsed(TimeUnit.SECONDS) < 5);
+
+ throw new AssertionError("Expected %s changes, received only %s".formatted(expectedCount, ret.size()));
+ }
+
+ synchronized int changeCount() {
+ return accumulatedChanges.size();
}
}
super(true);
}
- @SafeVarargs
- protected final <T extends DataObject> TestListener<T> createListener(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path, final Function<DataTreeModification<T>, Boolean>... matchers) {
- TestListener<T> listener = new TestListener<>(matchers);
- getDataBroker().registerDataTreeChangeListener(DataTreeIdentifier.create(store, path), listener);
- return listener;
+ protected final <T extends DataObject> @NonNull ModificationCollector<T> createCollector(
+ final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
+ final var listener = new TestListener<T>();
+ final var reg = getDataBroker().registerDataTreeChangeListener(DataTreeIdentifier.of(store, path), listener);
+ listener.awaitSync();
+ return new ModificationCollector<>(listener, reg);
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> match(
- final ModificationType type, final InstanceIdentifier<T> path, final Function<T, Boolean> checkDataBefore,
- final Function<T, Boolean> checkDataAfter) {
- return modification -> type == modification.getRootNode().getModificationType()
- && path.equals(modification.getRootPath().getRootIdentifier())
- && checkDataBefore.apply(modification.getRootNode().getDataBefore())
- && checkDataAfter.apply(modification.getRootNode().getDataAfter());
+ public static <T extends DataObject> @NonNull Matcher<T> match(final ModificationType type,
+ final InstanceIdentifier<T> path, final DataMatcher<T> checkDataBefore,
+ final DataMatcher<T> checkDataAfter) {
+ return modification -> type == modification.getRootNode().modificationType()
+ && path.equals(modification.getRootPath().path())
+ && checkDataBefore.apply(modification.getRootNode().dataBefore())
+ && checkDataAfter.apply(modification.getRootNode().dataAfter());
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> match(final ModificationType type,
+ public static <T extends DataObject> @NonNull Matcher<T> match(final ModificationType type,
final InstanceIdentifier<T> path, final T expDataBefore, final T expDataAfter) {
return match(type, path, dataBefore -> Objects.equals(expDataBefore, dataBefore),
- (Function<T, Boolean>) dataAfter -> Objects.equals(expDataAfter, dataAfter));
+ (DataMatcher<T>) dataAfter -> Objects.equals(expDataAfter, dataAfter));
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> added(
- final InstanceIdentifier<T> path, final T data) {
+ public static <T extends DataObject> @NonNull Matcher<T> added(final InstanceIdentifier<T> path, final T data) {
return match(ModificationType.WRITE, path, null, data);
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> replaced(
- final InstanceIdentifier<T> path, final T dataBefore, final T dataAfter) {
+ public static <T extends DataObject> @NonNull Matcher<T> replaced(final InstanceIdentifier<T> path,
+ final T dataBefore, final T dataAfter) {
return match(ModificationType.WRITE, path, dataBefore, dataAfter);
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> deleted(
- final InstanceIdentifier<T> path, final T dataBefore) {
+ public static <T extends DataObject> @NonNull Matcher<T> deleted(final InstanceIdentifier<T> path,
+ final T dataBefore) {
return match(ModificationType.DELETE, path, dataBefore, null);
}
- public static <T extends DataObject> Function<DataTreeModification<T>, Boolean> subtreeModified(
- final InstanceIdentifier<T> path, final T dataBefore, final T dataAfter) {
+ public static <T extends DataObject> @NonNull Matcher<T> subtreeModified(final InstanceIdentifier<T> path,
+ final T dataBefore, final T dataAfter) {
return match(ModificationType.SUBTREE_MODIFIED, path, dataBefore, dataAfter);
}
}