import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
+import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
protected void handleReceive(final Object message) {
if (message instanceof DataTreeChanged) {
dataChanged((DataTreeChanged)message);
+ } else if (message instanceof OnInitialData) {
+ onInitialData();
} else if (message instanceof EnableNotification) {
enableNotification((EnableNotification) message);
} else if (message instanceof GetInfo) {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onInitialData() {
+ LOG.debug("{}: Notifying onInitialData to listener {}", logContext, listener);
+
+ try {
+ this.listener.onInitialData();
+ } catch (Exception e) {
+ LOG.error("{}: Error notifying listener {}", logContext, this.listener, e);
+ }
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void dataChanged(final DataTreeChanged message) {
// Do nothing if notifications are not enabled
public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
Optional<DataTreeCandidate> initialState,
Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ registerTreeChangeListener(treeId, listener, onRegistration);
+
+ if (initialState.isPresent()) {
+ notifySingleListener(treeId, listener, initialState.get(), logContext);
+ } else {
+ listener.onInitialData();
+ }
+ }
+
+ void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+ Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
LOG.debug("{}: registerTreeChangeListener: path: {}, listener: {}", logContext, treeId, listener);
AbstractDOMDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> registration =
super.registerTreeChangeListener(treeId, listener);
onRegistration.accept(registration);
-
- if (initialState.isPresent()) {
- notifySingleListener(treeId, listener, initialState.get(), logContext);
- }
}
static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
DefaultShardDataTreeChangeListenerPublisher publisher =
new DefaultShardDataTreeChangeListenerPublisher(logContext);
publisher.logContext = logContext;
- publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { /* NOOP */ });
- publisher.publishChanges(state);
+ publisher.registerTreeChangeListener(treeId, listener);
+
+ if (!publisher.processCandidateTree(state)) {
+ listener.onInitialData();
+ }
}
}
import com.google.common.base.Preconditions;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
}
+ @Override
+ public void onInitialData() {
+ LOG.debug("Sending OnInitialData to {}", actor);
+ actor.tell(OnInitialData.INSTANCE, ActorRef.noSender());
+ }
+
@Override
public String toString() {
return "ForwardingDataTreeChangeListener [actor=" + actor + "]";
* @author Thomas Pantelis
*/
public final class ShardDataTreeChangePublisherActor
- extends ShardDataTreeNotificationPublisherActor<ShardDataTreeChangeListenerPublisher> {
+ extends ShardDataTreeNotificationPublisherActor<DefaultShardDataTreeChangeListenerPublisher> {
private ShardDataTreeChangePublisherActor(final String name, final String logContext) {
super(new DefaultShardDataTreeChangeListenerPublisher(logContext), name, logContext);
if (reg.initialState.isPresent()) {
DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener,
reg.initialState.get(), logContext());
+ } else {
+ reg.listener.onInitialData();
}
- publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration);
+ publisher().registerTreeChangeListener(reg.path, reg.listener, reg.onRegistration);
} else {
super.handleReceive(message);
}
--- /dev/null
+/*
+ * Copyright (c) 2019 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to a data tree change listener actor to indicate there is no initial data.
+ *
+ * @author Thomas Pantelis
+ */
+public final class OnInitialData {
+ public static final OnInitialData INSTANCE = new OnInitialData();
+
+ private OnInitialData() {
+ // Hidden on purpose
+ }
+}
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
listener.waitForChangeEvents();
+ listener.verifyOnInitialDataEvent();
+
+ final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
+ TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
+
+ testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
+
+ listener2.waitForChangeEvents();
+ listener2.verifyNoOnInitialDataEvent();
}
@SuppressWarnings("serial")
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final List<DataTreeCandidate> changeList = new ArrayList<>();
+ private final CountDownLatch onInitialDataLatch = new CountDownLatch(1);
+ private final AtomicInteger onInitialDataEventCount = new AtomicInteger();
+
private volatile CountDownLatch changeLatch;
private int expChangeEventCount;
}
}
+ @Override
+ public void onInitialData() {
+ onInitialDataEventCount.incrementAndGet();
+ onInitialDataLatch.countDown();
+ }
+
+ public void verifyOnInitialDataEvent() {
+ assertTrue("onInitialData was not triggered",
+ Uninterruptibles.awaitUninterruptibly(onInitialDataLatch, 5, TimeUnit.SECONDS));
+ assertEquals("onInitialDataEventCount", 1, onInitialDataEventCount.get());
+ }
+
+ public void verifyNoOnInitialDataEvent() {
+ assertFalse("onInitialData was triggered unexpectedly",
+ Uninterruptibles.awaitUninterruptibly(onInitialDataLatch, 500, TimeUnit.MILLISECONDS));
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public void waitForChangeEvents(final YangInstanceIdentifier... expPaths) {
boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);