+ final SettableFuture<Collection<DataTreeCandidate>> future = SettableFuture.create();
+ final Collection<DataTreeCandidate> accumulatedChanges = new ArrayList<>();
+ final Function<DataTreeCandidate, Boolean>[] matchers;
+ final int expChangeCount;
+
+ ChangeEventListener(Function<DataTreeCandidate, Boolean>[] matchers) {
+ this.expChangeCount = matchers.length;
+ this.matchers = matchers;
+ }
+
+ Collection<DataTreeCandidate> changes() {
+ try {
+ Collection<DataTreeCandidate> changes = internalListener.future.get(10, TimeUnit.SECONDS);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ return changes;
+ } catch (TimeoutException e) {
+ throw new AssertionError(String.format(
+ "Data tree change notifications not received. Expected: %s. Actual: %s - %s",
+ expChangeCount, accumulatedChanges.size(), accumulatedChanges), e);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new AssertionError("Data tree change notifications failed", e);
+ }
+ }
+
+ void verifyChangeEvents() {
+ Collection<DataTreeCandidate> changes = new ArrayList<>(changes());
+ Iterator<DataTreeCandidate> iter = changes.iterator();
+ while (iter.hasNext()) {
+ DataTreeCandidate dataTreeModification = iter.next();
+ for (Function<DataTreeCandidate, Boolean> matcher: matchers) {
+ if (matcher.apply(dataTreeModification)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+
+ if (!changes.isEmpty()) {
+ DataTreeCandidate mod = changes.iterator().next();
+ fail(String.format("Received unexpected notification: type: %s, path: %s, before: %s, after: %s",
+ mod.getRootNode().getModificationType(), mod.getRootPath(),
+ mod.getRootNode().getDataBefore(), mod.getRootNode().getDataAfter()));
+ }
+ }
+
+ void verifyNoChangeEvent() {
+ try {
+ Object unexpected = internalListener.future.get(500, TimeUnit.MILLISECONDS);
+ fail("Got unexpected Data tree change notifications: " + unexpected);
+ } catch (TimeoutException e) {
+ // Expected
+ } catch (InterruptedException | ExecutionException e) {
+ throw new AssertionError("Data tree change notifications failed", e);
+ }
+ }