From: Harman Singh Date: Fri, 24 Jul 2015 01:32:46 +0000 (-0700) Subject: Enabling Data Change Notifications for all nodes in cluster. X-Git-Tag: release/beryllium~352 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=66a6b6f931af3fcd1ce61263c457304cfbdc2bb5 Enabling Data Change Notifications for all nodes in cluster. Two new interfaces are introduced ClusteredDataChangeListener and ClusteredDOMDataChangeListener and external applications will have to implement any of that interface, if those applications want to listen to remote data change notifications. Datastore registers listeners, which are instance of that interface, even on followers. Change-Id: I0e29cdf2a08a2051de5fc8ce73b9ec8ac408e45b Signed-off-by: Harman Singh --- diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java new file mode 100644 index 0000000000..511910de62 --- /dev/null +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + + +package org.opendaylight.controller.md.sal.binding.api; + +/** + *

+ * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster, + * where this listener is registered. + *

+ * + *

Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen + * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications + * only at leader of the datastore shard.

+ * + */ + +public interface ClusteredDataChangeListener extends DataChangeListener{ +} diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java index 65257a328a..2833f41f67 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java @@ -12,6 +12,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +/* + * DataChangeListener enables data change notifications only at leader of the datastore shard + */ + public interface DataChangeListener extends AsyncDataChangeListener, DataObject> { @Override void onDataChanged(AsyncDataChangeEvent, DataObject> change); diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java index 4bbeaebf55..ad52557b47 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java @@ -15,12 +15,15 @@ import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + +import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Delegator; @@ -63,8 +66,14 @@ public abstract class AbstractForwardedDataBroker implements Delegator registerDataChangeListener(final LogicalDatastoreType store, final InstanceIdentifier path, final DataChangeListener listener, final DataChangeScope triggeringScope) { - final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener, + final DOMDataChangeListener domDataChangeListener; + + if(listener instanceof ClusteredDataChangeListener) { + domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope); + } else { + domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener, triggeringScope); + } final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path); final ListenerRegistration domRegistration = domDataBroker.registerDataChangeListener(store, domPath, domDataChangeListener, triggeringScope); @@ -140,6 +149,20 @@ public abstract class AbstractForwardedDataBroker implements Delegator path, + DataChangeListener bindingDataChangeListener, + DataChangeScope triggeringScope) { + super(store, path, bindingDataChangeListener, triggeringScope); + } + } + private class TranslatedDataChangeEvent implements AsyncDataChangeEvent, DataObject> { private final AsyncDataChangeEvent> domEvent; private final InstanceIdentifier path; diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java new file mode 100644 index 0000000000..e2fcebaac1 --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.binding.impl.test; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener; +import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter; +import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top; +import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry; +import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + + +public class BindingDOMDataBrokerAdapterTest { + + @Mock + DOMDataBroker dataBroker; + + @Mock + GeneratedClassLoadingStrategy classLoadingStrategy; + @Mock + BindingNormalizedNodeCodecRegistry codecRegistry; + + @Mock + YangInstanceIdentifier yangInstanceIdentifier; + + + private static final InstanceIdentifier TOP_PATH = InstanceIdentifier + .create(Top.class); + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testClusteredDataChangeListernerRegisteration() { + + BindingToNormalizedNodeCodec codec = new BindingToNormalizedNodeCodec(classLoadingStrategy, codecRegistry); + + BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter = new BindingDOMDataBrokerAdapter(dataBroker, codec); + Mockito.when(codecRegistry.toYangInstanceIdentifier(TOP_PATH)).thenReturn(yangInstanceIdentifier); + + ArgumentCaptor clusteredDOMListener = ArgumentCaptor. + forClass(ClusteredDOMDataChangeListener.class); + ArgumentCaptor logicalDatastoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class); + ArgumentCaptor dataChangeScope = ArgumentCaptor. + forClass(AsyncDataBroker.DataChangeScope.class); + ArgumentCaptor yangInstanceIdentifier = ArgumentCaptor. + forClass(YangInstanceIdentifier.class); + + TestListener listener = new TestListener(); + + bindingDOMDataBrokerAdapter.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, TOP_PATH, listener, + AsyncDataBroker.DataChangeScope.BASE); + Mockito.verify(dataBroker).registerDataChangeListener(logicalDatastoreType.capture(), yangInstanceIdentifier.capture(), + clusteredDOMListener.capture(), dataChangeScope.capture()); + + } + + private class TestListener implements ClusteredDataChangeListener { + + @Override + public void onDataChanged(AsyncDataChangeEvent, DataObject> change) { + + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java index 77cbd4da46..77f5ddb4f2 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java @@ -683,6 +683,16 @@ public final class ListenerRegistrationMessages { * required int32 dataChangeScope = 3; */ int getDataChangeScope(); + + // optional bool registerOnAllInstances = 4; + /** + * optional bool registerOnAllInstances = 4; + */ + boolean hasRegisterOnAllInstances(); + /** + * optional bool registerOnAllInstances = 4; + */ + boolean getRegisterOnAllInstances(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.RegisterChangeListener} @@ -758,6 +768,11 @@ public final class ListenerRegistrationMessages { dataChangeScope_ = input.readInt32(); break; } + case 32: { + bitField0_ |= 0x00000008; + registerOnAllInstances_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -879,10 +894,27 @@ public final class ListenerRegistrationMessages { return dataChangeScope_; } + // optional bool registerOnAllInstances = 4; + public static final int REGISTERONALLINSTANCES_FIELD_NUMBER = 4; + private boolean registerOnAllInstances_; + /** + * optional bool registerOnAllInstances = 4; + */ + public boolean hasRegisterOnAllInstances() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bool registerOnAllInstances = 4; + */ + public boolean getRegisterOnAllInstances() { + return registerOnAllInstances_; + } + private void initFields() { instanceIdentifierPath_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance(); dataChangeListenerActorPath_ = ""; dataChangeScope_ = 0; + registerOnAllInstances_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -921,6 +953,9 @@ public final class ListenerRegistrationMessages { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeInt32(3, dataChangeScope_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(4, registerOnAllInstances_); + } getUnknownFields().writeTo(output); } @@ -942,6 +977,10 @@ public final class ListenerRegistrationMessages { size += com.google.protobuf.CodedOutputStream .computeInt32Size(3, dataChangeScope_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, registerOnAllInstances_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1069,6 +1108,8 @@ public final class ListenerRegistrationMessages { bitField0_ = (bitField0_ & ~0x00000002); dataChangeScope_ = 0; bitField0_ = (bitField0_ & ~0x00000004); + registerOnAllInstances_ = false; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1113,6 +1154,10 @@ public final class ListenerRegistrationMessages { to_bitField0_ |= 0x00000004; } result.dataChangeScope_ = dataChangeScope_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.registerOnAllInstances_ = registerOnAllInstances_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1140,6 +1185,9 @@ public final class ListenerRegistrationMessages { if (other.hasDataChangeScope()) { setDataChangeScope(other.getDataChangeScope()); } + if (other.hasRegisterOnAllInstances()) { + setRegisterOnAllInstances(other.getRegisterOnAllInstances()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1407,6 +1455,39 @@ public final class ListenerRegistrationMessages { return this; } + // optional bool registerOnAllInstances = 4; + private boolean registerOnAllInstances_ ; + /** + * optional bool registerOnAllInstances = 4; + */ + public boolean hasRegisterOnAllInstances() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bool registerOnAllInstances = 4; + */ + public boolean getRegisterOnAllInstances() { + return registerOnAllInstances_; + } + /** + * optional bool registerOnAllInstances = 4; + */ + public Builder setRegisterOnAllInstances(boolean value) { + bitField0_ |= 0x00000008; + registerOnAllInstances_ = value; + onChanged(); + return this; + } + /** + * optional bool registerOnAllInstances = 4; + */ + public Builder clearRegisterOnAllInstances() { + bitField0_ = (bitField0_ & ~0x00000008); + registerOnAllInstances_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.RegisterChangeListener) } @@ -1944,15 +2025,16 @@ public final class ListenerRegistrationMessages { "ylight.controller.mdsal\032\014Common.proto\"%\n" + "#CloseDataChangeListenerRegistration\"*\n(" + "CloseDataChangeListenerRegistrationReply" + - "\"\255\001\n\026RegisterChangeListener\022U\n\026instanceI" + + "\"\315\001\n\026RegisterChangeListener\022U\n\026instanceI" + "dentifierPath\030\001 \002(\01325.org.opendaylight.c" + "ontroller.mdsal.InstanceIdentifier\022#\n\033da" + "taChangeListenerActorPath\030\002 \002(\t\022\027\n\017dataC" + - "hangeScope\030\003 \002(\005\"?\n\033RegisterChangeListen" + - "erReply\022 \n\030listenerRegistrationPath\030\001 \002(", - "\tB[\n;org.opendaylight.controller.protobu" + - "ff.messages.registrationB\034ListenerRegist" + - "rationMessages" + "hangeScope\030\003 \002(\005\022\036\n\026registerOnAllInstanc" + + "es\030\004 \001(\010\"?\n\033RegisterChangeListenerReply\022", + " \n\030listenerRegistrationPath\030\001 \002(\tB[\n;org" + + ".opendaylight.controller.protobuff.messa" + + "ges.registrationB\034ListenerRegistrationMe" + + "ssages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1976,7 +2058,7 @@ public final class ListenerRegistrationMessages { internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_descriptor, - new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", }); + new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", "RegisterOnAllInstances", }); internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto index 3342a1364a..4b45c85502 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto @@ -27,6 +27,7 @@ message RegisterChangeListener{ required InstanceIdentifier instanceIdentifierPath=1; required string dataChangeListenerActorPath=2; required int32 dataChangeScope=3; +optional bool registerOnAllInstances=4; } /** * This is the reply for the RegisterChangeListener message diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 8ac424a6a8..6f8bc633c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -116,7 +117,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration DataChangeScope scope) { Future future = actorContext.executeOperationAsync(shard, - new RegisterChangeListener(path, dataChangeListenerActor, scope), + new RegisterChangeListener(path, dataChangeListenerActor, scope, + listener instanceof ClusteredDOMDataChangeListener), actorContext.getDatastoreContext().getShardInitializationTimeout()); future.onComplete(new OnComplete(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index e6f63d7154..c7b55414e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -28,42 +28,57 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory delayedListenerRegistrations = new ArrayList<>(); private final List dataChangeListeners = new ArrayList<>(); + private final List delayedRegisterOnAllListeners = new ArrayList<>(); DataChangeListenerSupport(final Shard shard) { super(shard); } @Override - void onLeadershipChange(final boolean isLeader) { + void onLeadershipChange(final boolean isLeader, boolean hasLeader) { + LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader); + for (ActorSelection dataChangeListener : dataChangeListeners) { dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); } + if(hasLeader) { + for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) { + registerDelayedListeners(reg); + } + delayedRegisterOnAllListeners.clear(); + } + if (isLeader) { for (DelayedListenerRegistration reg: delayedListenerRegistrations) { - if(!reg.isClosed()) { - final Entry>>, DOMImmutableDataChangeEvent> res = - createDelegate(reg.getRegisterChangeListener()); - reg.setDelegate(res.getKey()); - if (res.getValue() != null) { - reg.getInstance().onDataChanged(res.getValue()); - } - } + registerDelayedListeners(reg); } delayedListenerRegistrations.clear(); } } + private void registerDelayedListeners(DelayedListenerRegistration reg) { + if(!reg.isClosed()) { + final Entry>>, DOMImmutableDataChangeEvent> res = + createDelegate(reg.getRegisterChangeListener()); + reg.setDelegate(res.getKey()); + if (res.getValue() != null) { + reg.getInstance().onDataChanged(res.getValue()); + } + } + } + @Override - void onMessage(final RegisterChangeListener message, final boolean isLeader) { + void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) { - LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader); + LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}", + persistenceId(), message.getPath(), isLeader, hasLeader); final ListenerRegistration>> registration; final AsyncDataChangeEvent> event; - if (isLeader) { + if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) { final Entry>>, DOMImmutableDataChangeEvent> res = createDelegate(message); registration = res.getKey(); @@ -72,7 +87,11 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory> listener = new DataChangeListenerProxy(dataChangeListenerPath); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index db5eeb83e7..4281bfe796 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -32,7 +32,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory registration; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java index 3f927736b5..8ce28571ee 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -60,8 +60,9 @@ abstract class LeaderLocalDelegateFactory extends DelegateFactory> mockClusteredListener = + Mockito.mock(ClusteredDOMDataChangeListener.class); + + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockClusteredListener); + + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread() { + @Override + public void run() { + proxy.init(path, scope); + } + + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new LocalShardFound(getRef())); + + RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("getScope", scope, registerMsg.getScope()); + Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + + reply(new RegisterChangeListenerReply(getRef())); + + for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), + proxy.getListenerRegistrationActor()); + + watch(proxy.getDataChangeListenerActor()); + + proxy.close(); + + // The listener registration actor should get a Close message + expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS); + + // The DataChangeListener actor should be terminated + expectMsgClass(timeout, Terminated.class); + + proxy.close(); + + expectNoMsg(); + }}; + } + @Test(timeout=10000) public void testLocalShardNotFound() { new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index d9df305453..4f4162f607 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; @@ -148,7 +149,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, - dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef()); + dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterChangeListenerReply.class); @@ -241,7 +242,7 @@ public class ShardTest extends AbstractShardTest { // Now send the RegisterChangeListener and wait for the reply. shard.tell(new RegisterChangeListener(path, dclActor, - AsyncDataBroker.DataChangeScope.SUBTREE), getRef()); + AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterChangeListenerReply.class); @@ -346,11 +347,11 @@ public class ShardTest extends AbstractShardTest { writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); assertEquals("Got first ElectionTimeout", true, - onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + RegisterDataTreeChangeListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); shard.tell(new FindLeader(), getRef()); @@ -450,13 +451,13 @@ public class ShardTest extends AbstractShardTest { //waitUntilLeader(shard); assertEquals("Recovery complete", true, - Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); final String address = "akka://foobar"; shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address)); assertEquals("getPeerAddresses", address, - ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); + ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; @@ -551,8 +552,8 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - SerializationUtils.serializeNormalizedNode(root), - Collections.emptyList(), 0, 1, -1, -1)); + SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 0, 1, -1, -1)); return testStore; } @@ -594,7 +595,7 @@ public class ShardTest extends AbstractShardTest { } InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, - new ApplyJournalEntries(nListEntries)); + new ApplyJournalEntries(nListEntries)); testRecovery(listEntryKeys); } @@ -610,8 +611,8 @@ public class ShardTest extends AbstractShardTest { InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload( - new WriteModification(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); final int nListEntries = 16; final Set listEntryKeys = new HashSet<>(); @@ -936,12 +937,12 @@ public class ShardTest extends AbstractShardTest { // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -1268,7 +1269,7 @@ public class ShardTest extends AbstractShardTest { final MutableCompositeModification modification = new MutableCompositeModification(); final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, - TestModel.TEST_PATH, containerNode, modification); + TestModel.TEST_PATH, containerNode, modification); final FiniteDuration duration = duration("5 seconds"); @@ -1276,7 +1277,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1559,7 +1560,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and @@ -1681,7 +1682,7 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1826,12 +1827,12 @@ public class ShardTest extends AbstractShardTest { modification, preCommit); shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); @@ -2012,7 +2013,7 @@ public class ShardTest extends AbstractShardTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final String transactionID2 = "tx2"; @@ -2030,7 +2031,7 @@ public class ShardTest extends AbstractShardTest { TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // All Tx's are readied. We'll send canCommit for the last one but not the others. The others @@ -2412,7 +2413,7 @@ public class ShardTest extends AbstractShardTest { shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), - nonPersistentContext, SCHEMA_CONTEXT); + nonPersistentContext, SCHEMA_CONTEXT); new ShardTestKit(getSystem()) {{ final TestActorRef shard1 = TestActorRef.create(getSystem(), @@ -2448,12 +2449,12 @@ public class ShardTest extends AbstractShardTest { shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", false, - shard.underlyingActor().persistence().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().persistence().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; @@ -2477,11 +2478,11 @@ public class ShardTest extends AbstractShardTest { MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, - ShardLeaderStateChanged.class); + ShardLeaderStateChanged.class); assertEquals("getLocalShardDataTree present", true, leaderStateChanged.getLocalShardDataTree().isPresent()); assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), - leaderStateChanged.getLocalShardDataTree().get()); + leaderStateChanged.getLocalShardDataTree().get()); MessageCollectorActor.clearMessages(listener); @@ -2521,4 +2522,144 @@ public class ShardTest extends AbstractShardTest { store.validate(modification); store.commit(store.prepare(modification)); } + + @Test + public void testClusteredDataChangeListernerDelayedRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); + final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); + final Creator creator = new Creator() { + boolean firstElectionTimeout = true; + + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof ElectionTimeout && firstElectionTimeout) { + firstElectionTimeout = false; + final ActorRef self = getSelf(); + new Thread() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + } + }.start(); + + onFirstElectionTimeout.countDown(); + } else { + super.onReceiveCommand(message); + } + } + }; + } + }; + + final MockDataChangeListener listener = new MockDataChangeListener(1); + final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), + "testDataChangeListenerOnFollower-DataChangeListener"); + + final TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), + "testDataChangeListenerOnFollower"); + + assertEquals("Got first ElectionTimeout", true, + onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + + shard.tell(new FindLeader(), getRef()); + final FindLeaderReply findLeadeReply = + expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + + final YangInstanceIdentifier path = TestModel.TEST_PATH; + + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); + final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + onChangeListenerRegistered.countDown(); + + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testClusteredDataChangeListernerRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2") + .shardName("inventory").type("config").build(); + final Creator followerShardCreator = new Creator() { + + @Override + public Shard create() throws Exception { + return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(), + "akka://test/user/" + member2ShardID.toString()), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + + if(!(message instanceof ElectionTimeout)) { + super.onReceiveCommand(message); + } + } + }; + } + }; + + final Creator leaderShardCreator = new Creator() { + + @Override + public Shard create() throws Exception { + return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(), + "akka://test/user/" + member1ShardID.toString()), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { }; + } + }; + + + final TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(followerShardCreator)), + member1ShardID.toString()); + + final TestActorRef shardLeader = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(leaderShardCreator)), + member2ShardID.toString()); + // Sleep to let election happen + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + shard.tell(new FindLeader(), getRef()); + final FindLeaderReply findLeaderReply = + expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor()); + + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataChangeListener listener = new MockDataChangeListener(1); + final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), + "testDataChangeListenerOnFollower-DataChangeListener"); + + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); + final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java index 5dfabce006..4ccee663e9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java @@ -41,7 +41,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest { public void testToSerializable(){ TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor - , AsyncDataBroker.DataChangeScope.BASE); + , AsyncDataBroker.DataChangeScope.BASE, false); ListenerRegistrationMessages.RegisterChangeListener serialized = registerChangeListener.toSerializable(); @@ -51,6 +51,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest { assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0)); assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath()); assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope()); + assertEquals(false, serialized.getRegisterOnAllInstances()); } @@ -58,7 +59,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest { public void testFromSerializable(){ TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor - , AsyncDataBroker.DataChangeScope.SUBTREE); + , AsyncDataBroker.DataChangeScope.SUBTREE, true); ListenerRegistrationMessages.RegisterChangeListener serialized = registerChangeListener.toSerializable(); @@ -69,7 +70,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest { assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath()); assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString()); assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope()); - + assertEquals(true, fromSerialized.isRegisterOnAllInstances()); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java new file mode 100644 index 0000000000..e5dbc545e5 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.dom.api; + +/** + *

ClusteredDOMDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster, + * where this listener is registered.

+ * + * + *

Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to listen + * to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change notifications + * only at leader of the datastore shard.

+ * + */ + +public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{ + +} diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java index 393d1eaafe..c46529bd02 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java @@ -11,6 +11,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +/** + * DOMDataChangeListener enables data change notifications only at leader of the datastore shard. + */ + public interface DOMDataChangeListener extends AsyncDataChangeListener> { }