Two new interfaces are introduced ClusteredDataChangeListener and ClusteredDOMDataChangeListener and external applications will have to implement any of that interface,
if those applications want to listen to remote data change notifications.
Datastore registers listeners, which are instance of that interface, even on followers.
Change-Id: I0e29cdf2a08a2051de5fc8ce73b9ec8ac408e45b
Signed-off-by: Harman Singh <harmasin@cisco.com>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+/**
+ * <p>
+ * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.
+ * </p>
+ *
+ * <p>Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDataChangeListener extends DataChangeListener{
+}
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+/*
+ * DataChangeListener enables data change notifications only at leader of the datastore shard
+ */
+
public interface DataChangeListener extends AsyncDataChangeListener<InstanceIdentifier<?>, DataObject> {
@Override
void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change);
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Delegator;
public ListenerRegistration<DataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
- final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
+ final DOMDataChangeListener domDataChangeListener;
+
+ if(listener instanceof ClusteredDataChangeListener) {
+ domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope);
+ } else {
+ domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
triggeringScope);
+ }
final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
domPath, domDataChangeListener, triggeringScope);
}
}
+ /**
+ * Translator for ClusteredDataChangeListener
+ */
+
+ private class TranslatingClusteredDataChangeInvoker extends TranslatingDataChangeInvoker implements
+ ClusteredDOMDataChangeListener {
+
+ public TranslatingClusteredDataChangeInvoker(LogicalDatastoreType store, InstanceIdentifier<?> path,
+ DataChangeListener bindingDataChangeListener,
+ DataChangeScope triggeringScope) {
+ super(store, path, bindingDataChangeListener, triggeringScope);
+ }
+ }
+
private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> domEvent;
private final InstanceIdentifier<?> path;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+public class BindingDOMDataBrokerAdapterTest {
+
+ @Mock
+ DOMDataBroker dataBroker;
+
+ @Mock
+ GeneratedClassLoadingStrategy classLoadingStrategy;
+ @Mock
+ BindingNormalizedNodeCodecRegistry codecRegistry;
+
+ @Mock
+ YangInstanceIdentifier yangInstanceIdentifier;
+
+
+ private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier
+ .create(Top.class);
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testClusteredDataChangeListernerRegisteration() {
+
+ BindingToNormalizedNodeCodec codec = new BindingToNormalizedNodeCodec(classLoadingStrategy, codecRegistry);
+
+ BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter = new BindingDOMDataBrokerAdapter(dataBroker, codec);
+ Mockito.when(codecRegistry.toYangInstanceIdentifier(TOP_PATH)).thenReturn(yangInstanceIdentifier);
+
+ ArgumentCaptor<ClusteredDOMDataChangeListener> clusteredDOMListener = ArgumentCaptor.
+ forClass(ClusteredDOMDataChangeListener.class);
+ ArgumentCaptor<LogicalDatastoreType> logicalDatastoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
+ ArgumentCaptor<AsyncDataBroker.DataChangeScope> dataChangeScope = ArgumentCaptor.
+ forClass(AsyncDataBroker.DataChangeScope.class);
+ ArgumentCaptor<YangInstanceIdentifier> yangInstanceIdentifier = ArgumentCaptor.
+ forClass(YangInstanceIdentifier.class);
+
+ TestListener listener = new TestListener();
+
+ bindingDOMDataBrokerAdapter.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, TOP_PATH, listener,
+ AsyncDataBroker.DataChangeScope.BASE);
+ Mockito.verify(dataBroker).registerDataChangeListener(logicalDatastoreType.capture(), yangInstanceIdentifier.capture(),
+ clusteredDOMListener.capture(), dataChangeScope.capture());
+
+ }
+
+ private class TestListener implements ClusteredDataChangeListener {
+
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+
+ }
+ }
+}
* <code>required int32 dataChangeScope = 3;</code>
*/
int getDataChangeScope();
+
+ // optional bool registerOnAllInstances = 4;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ boolean hasRegisterOnAllInstances();
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ boolean getRegisterOnAllInstances();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.RegisterChangeListener}
dataChangeScope_ = input.readInt32();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ registerOnAllInstances_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return dataChangeScope_;
}
+ // optional bool registerOnAllInstances = 4;
+ public static final int REGISTERONALLINSTANCES_FIELD_NUMBER = 4;
+ private boolean registerOnAllInstances_;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean hasRegisterOnAllInstances() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean getRegisterOnAllInstances() {
+ return registerOnAllInstances_;
+ }
+
private void initFields() {
instanceIdentifierPath_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
dataChangeListenerActorPath_ = "";
dataChangeScope_ = 0;
+ registerOnAllInstances_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(3, dataChangeScope_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBool(4, registerOnAllInstances_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(3, dataChangeScope_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(4, registerOnAllInstances_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000002);
dataChangeScope_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
+ registerOnAllInstances_ = false;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
to_bitField0_ |= 0x00000004;
}
result.dataChangeScope_ = dataChangeScope_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.registerOnAllInstances_ = registerOnAllInstances_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasDataChangeScope()) {
setDataChangeScope(other.getDataChangeScope());
}
+ if (other.hasRegisterOnAllInstances()) {
+ setRegisterOnAllInstances(other.getRegisterOnAllInstances());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional bool registerOnAllInstances = 4;
+ private boolean registerOnAllInstances_ ;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean hasRegisterOnAllInstances() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean getRegisterOnAllInstances() {
+ return registerOnAllInstances_;
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public Builder setRegisterOnAllInstances(boolean value) {
+ bitField0_ |= 0x00000008;
+ registerOnAllInstances_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public Builder clearRegisterOnAllInstances() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ registerOnAllInstances_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.RegisterChangeListener)
}
"ylight.controller.mdsal\032\014Common.proto\"%\n" +
"#CloseDataChangeListenerRegistration\"*\n(" +
"CloseDataChangeListenerRegistrationReply" +
- "\"\255\001\n\026RegisterChangeListener\022U\n\026instanceI" +
+ "\"\315\001\n\026RegisterChangeListener\022U\n\026instanceI" +
"dentifierPath\030\001 \002(\01325.org.opendaylight.c" +
"ontroller.mdsal.InstanceIdentifier\022#\n\033da" +
"taChangeListenerActorPath\030\002 \002(\t\022\027\n\017dataC" +
- "hangeScope\030\003 \002(\005\"?\n\033RegisterChangeListen" +
- "erReply\022 \n\030listenerRegistrationPath\030\001 \002(",
- "\tB[\n;org.opendaylight.controller.protobu" +
- "ff.messages.registrationB\034ListenerRegist" +
- "rationMessages"
+ "hangeScope\030\003 \002(\005\022\036\n\026registerOnAllInstanc" +
+ "es\030\004 \001(\010\"?\n\033RegisterChangeListenerReply\022",
+ " \n\030listenerRegistrationPath\030\001 \002(\tB[\n;org" +
+ ".opendaylight.controller.protobuff.messa" +
+ "ges.registrationB\034ListenerRegistrationMe" +
+ "ssages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_descriptor,
- new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", });
+ new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", "RegisterOnAllInstances", });
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_fieldAccessorTable = new
required InstanceIdentifier instanceIdentifierPath=1;
required string dataChangeListenerActorPath=2;
required int32 dataChangeScope=3;
+optional bool registerOnAllInstances=4;
}
/**
* This is the reply for the RegisterChangeListener message
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor, scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope,
+ listener instanceof ClusteredDOMDataChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
@Override
- void onLeadershipChange(final boolean isLeader) {
+ void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+ LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
+
for (ActorSelection dataChangeListener : dataChangeListeners) {
dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
}
+ if(hasLeader) {
+ for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
+ registerDelayedListeners(reg);
+ }
+ delayedRegisterOnAllListeners.clear();
+ }
+
if (isLeader) {
for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
- if(!reg.isClosed()) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(reg.getRegisterChangeListener());
- reg.setDelegate(res.getKey());
- if (res.getValue() != null) {
- reg.getInstance().onDataChanged(res.getValue());
- }
- }
+ registerDelayedListeners(reg);
}
delayedListenerRegistrations.clear();
}
}
+ private void registerDelayedListeners(DelayedListenerRegistration reg) {
+ if(!reg.isClosed()) {
+ final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+ createDelegate(reg.getRegisterChangeListener());
+ reg.setDelegate(res.getKey());
+ if (res.getValue() != null) {
+ reg.getInstance().onDataChanged(res.getValue());
+ }
+ }
+ }
+
@Override
- void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+ void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
- LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+ LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
+ persistenceId(), message.getPath(), isLeader, hasLeader);
final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
- if (isLeader) {
+ if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
createDelegate(message);
registration = res.getKey();
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
- delayedListenerRegistrations.add(delayedReg);
+ if(message.isRegisterOnAllInstances()) {
+ delayedRegisterOnAllListeners.add(delayedReg);
+ } else {
+ delayedListenerRegistrations.add(delayedReg);
+ }
registration = delayedReg;
event = null;
}
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- dataChangeListeners.add(dataChangeListenerPath);
+ if(!message.isRegisterOnAllInstances()) {
+ dataChangeListeners.add(dataChangeListenerPath);
+ }
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
}
@Override
- void onLeadershipChange(final boolean isLeader) {
+ void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
if (isLeader) {
for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
reg.createDelegate(this);
}
@Override
- void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+ void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
final ListenerRegistration<DOMDataTreeChangeListener> registration;
* Invoked whenever the local shard's leadership role changes.
*
* @param isLeader true if the shard has become leader, false if it has
- * become a follower.
+ * become a follower.
+ * @param hasLeader true if the shard knows about leader ID
*/
- abstract void onLeadershipChange(boolean isLeader);
- abstract void onMessage(M message, boolean isLeader);
+ abstract void onLeadershipChange(boolean isLeader, boolean hasLeader);
+ abstract void onMessage(M message, boolean isLeader, boolean hasLeader);
}
if(context.error().isPresent()){
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
- context.error());
+ context.error());
}
try {
} else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof RegisterDataTreeChangeListener) {
- treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof PeerAddressResolved) {
}
}
+ private boolean hasLeader() {
+ return getLeaderId() != null;
+ }
+
public int getPendingTxCommitQueueSize() {
return commitCoordinator.getQueueSize();
}
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
- changeSupport.onLeadershipChange(isLeader);
- treeChangeSupport.onLeadershipChange(isLeader);
+ boolean hasLeader = hasLeader();
+ changeSupport.onLeadershipChange(isLeader, hasLeader);
+ treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
// If this actor is no longer the leader close all the transaction chains
if (!isLeader) {
private final YangInstanceIdentifier path;
private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
+ private final boolean registerOnAllInstances;
public RegisterChangeListener(YangInstanceIdentifier path,
ActorRef dataChangeListener,
- AsyncDataBroker.DataChangeScope scope) {
+ AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
this.path = path;
this.dataChangeListener = dataChangeListener;
this.scope = scope;
+ this.registerOnAllInstances = registerOnAllInstances;
}
public YangInstanceIdentifier getPath() {
return dataChangeListener.path();
}
+ public boolean isRegisterOnAllInstances() {
+ return registerOnAllInstances;
+ }
@Override
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
.setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
- .setDataChangeScope(scope.ordinal()).build();
+ .setDataChangeScope(scope.ordinal()).setRegisterOnAllInstances(registerOnAllInstances).build();
}
public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
- AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
+ AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()], o.getRegisterOnAllInstances());
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.ExecutionContextExecutor;
RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
reply(new RegisterChangeListenerReply(getRef()));
}};
}
+ @Test(timeout=10000)
+ public void testSuccessfulRegistrationForClusteredListener() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+ Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockClusteredListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+ reply(new RegisterChangeListenerReply(getRef()));
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
+
+ proxy.close();
+
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
+ }
+
@Test(timeout=10000)
public void testLocalShardNotFound() {
new JavaTestKit(getSystem()) {{
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
// Now send the RegisterChangeListener and wait for the reply.
shard.tell(new RegisterChangeListener(path, dclActor,
- AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+ AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(new FindLeader(), getRef());
//waitUntilLeader(shard);
assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("getPeerAddresses", address,
- ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+ ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
return testStore;
}
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
+ new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
}
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ TestModel.TEST_PATH, containerNode, modification);
final FiniteDuration duration = duration("5 seconds");
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
modification, preCommit);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID2 = "tx2";
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- nonPersistentContext, SCHEMA_CONTEXT);
+ nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
- ShardLeaderStateChanged.class);
+ ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", true,
leaderStateChanged.getLocalShardDataTree().isPresent());
assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
- leaderStateChanged.getLocalShardDataTree().get());
+ leaderStateChanged.getLocalShardDataTree().get());
MessageCollectorActor.clearMessages(listener);
store.validate(modification);
store.commit(store.prepare(modification));
}
+
+ @Test
+ public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ final Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testDataChangeListenerOnFollower");
+
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ onChangeListenerRegistered.countDown();
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testClusteredDataChangeListernerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+ .shardName("inventory").type("config").build();
+ final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+ "akka://test/user/" + member2ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+
+ if(!(message instanceof ElectionTimeout)) {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+ "akka://test/user/" + member1ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+ }
+ };
+
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(followerShardCreator)),
+ member1ShardID.toString());
+
+ final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(leaderShardCreator)),
+ member2ShardID.toString());
+ // Sleep to let election happen
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeaderReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
}
public void testToSerializable(){
TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
- , AsyncDataBroker.DataChangeScope.BASE);
+ , AsyncDataBroker.DataChangeScope.BASE, false);
ListenerRegistrationMessages.RegisterChangeListener serialized
= registerChangeListener.toSerializable();
assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+ assertEquals(false, serialized.getRegisterOnAllInstances());
}
public void testFromSerializable(){
TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
- , AsyncDataBroker.DataChangeScope.SUBTREE);
+ , AsyncDataBroker.DataChangeScope.SUBTREE, true);
ListenerRegistrationMessages.RegisterChangeListener serialized
= registerChangeListener.toSerializable();
assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
-
+ assertEquals(true, fromSerialized.isRegisterOnAllInstances());
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.api;
+
+/**
+ * <p>ClusteredDOMDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.</p>
+ *
+ *
+ * <p>Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{
+
+}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+/**
+ * DOMDataChangeListener enables data change notifications only at leader of the datastore shard.
+ */
+
public interface DOMDataChangeListener extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
}