--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+/**
+ * <p>
+ * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.
+ * </p>
+ *
+ * <p>Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDataChangeListener extends DataChangeListener{
+}
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+/*
+ * DataChangeListener enables data change notifications only at leader of the datastore shard
+ */
+
public interface DataChangeListener extends AsyncDataChangeListener<InstanceIdentifier<?>, DataObject> {
@Override
void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change);
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Delegator;
public ListenerRegistration<DataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
- final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
+ final DOMDataChangeListener domDataChangeListener;
+
+ if(listener instanceof ClusteredDataChangeListener) {
+ domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope);
+ } else {
+ domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
triggeringScope);
+ }
final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
domPath, domDataChangeListener, triggeringScope);
}
}
+ /**
+ * Translator for ClusteredDataChangeListener
+ */
+
+ private class TranslatingClusteredDataChangeInvoker extends TranslatingDataChangeInvoker implements
+ ClusteredDOMDataChangeListener {
+
+ public TranslatingClusteredDataChangeInvoker(LogicalDatastoreType store, InstanceIdentifier<?> path,
+ DataChangeListener bindingDataChangeListener,
+ DataChangeScope triggeringScope) {
+ super(store, path, bindingDataChangeListener, triggeringScope);
+ }
+ }
+
private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> domEvent;
private final InstanceIdentifier<?> path;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+public class BindingDOMDataBrokerAdapterTest {
+
+ @Mock
+ DOMDataBroker dataBroker;
+
+ @Mock
+ GeneratedClassLoadingStrategy classLoadingStrategy;
+ @Mock
+ BindingNormalizedNodeCodecRegistry codecRegistry;
+
+ @Mock
+ YangInstanceIdentifier yangInstanceIdentifier;
+
+
+ private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier
+ .create(Top.class);
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testClusteredDataChangeListernerRegisteration() {
+
+ BindingToNormalizedNodeCodec codec = new BindingToNormalizedNodeCodec(classLoadingStrategy, codecRegistry);
+
+ BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter = new BindingDOMDataBrokerAdapter(dataBroker, codec);
+ Mockito.when(codecRegistry.toYangInstanceIdentifier(TOP_PATH)).thenReturn(yangInstanceIdentifier);
+
+ ArgumentCaptor<ClusteredDOMDataChangeListener> clusteredDOMListener = ArgumentCaptor.
+ forClass(ClusteredDOMDataChangeListener.class);
+ ArgumentCaptor<LogicalDatastoreType> logicalDatastoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
+ ArgumentCaptor<AsyncDataBroker.DataChangeScope> dataChangeScope = ArgumentCaptor.
+ forClass(AsyncDataBroker.DataChangeScope.class);
+ ArgumentCaptor<YangInstanceIdentifier> yangInstanceIdentifier = ArgumentCaptor.
+ forClass(YangInstanceIdentifier.class);
+
+ TestListener listener = new TestListener();
+
+ bindingDOMDataBrokerAdapter.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, TOP_PATH, listener,
+ AsyncDataBroker.DataChangeScope.BASE);
+ Mockito.verify(dataBroker).registerDataChangeListener(logicalDatastoreType.capture(), yangInstanceIdentifier.capture(),
+ clusteredDOMListener.capture(), dataChangeScope.capture());
+
+ }
+
+ private class TestListener implements ClusteredDataChangeListener {
+
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+
+ }
+ }
+}
* <code>required int32 dataChangeScope = 3;</code>
*/
int getDataChangeScope();
+
+ // optional bool registerOnAllInstances = 4;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ boolean hasRegisterOnAllInstances();
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ boolean getRegisterOnAllInstances();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.RegisterChangeListener}
dataChangeScope_ = input.readInt32();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ registerOnAllInstances_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return dataChangeScope_;
}
+ // optional bool registerOnAllInstances = 4;
+ public static final int REGISTERONALLINSTANCES_FIELD_NUMBER = 4;
+ private boolean registerOnAllInstances_;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean hasRegisterOnAllInstances() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean getRegisterOnAllInstances() {
+ return registerOnAllInstances_;
+ }
+
private void initFields() {
instanceIdentifierPath_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
dataChangeListenerActorPath_ = "";
dataChangeScope_ = 0;
+ registerOnAllInstances_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(3, dataChangeScope_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBool(4, registerOnAllInstances_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(3, dataChangeScope_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(4, registerOnAllInstances_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000002);
dataChangeScope_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
+ registerOnAllInstances_ = false;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
to_bitField0_ |= 0x00000004;
}
result.dataChangeScope_ = dataChangeScope_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.registerOnAllInstances_ = registerOnAllInstances_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasDataChangeScope()) {
setDataChangeScope(other.getDataChangeScope());
}
+ if (other.hasRegisterOnAllInstances()) {
+ setRegisterOnAllInstances(other.getRegisterOnAllInstances());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional bool registerOnAllInstances = 4;
+ private boolean registerOnAllInstances_ ;
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean hasRegisterOnAllInstances() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public boolean getRegisterOnAllInstances() {
+ return registerOnAllInstances_;
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public Builder setRegisterOnAllInstances(boolean value) {
+ bitField0_ |= 0x00000008;
+ registerOnAllInstances_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool registerOnAllInstances = 4;</code>
+ */
+ public Builder clearRegisterOnAllInstances() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ registerOnAllInstances_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.RegisterChangeListener)
}
"ylight.controller.mdsal\032\014Common.proto\"%\n" +
"#CloseDataChangeListenerRegistration\"*\n(" +
"CloseDataChangeListenerRegistrationReply" +
- "\"\255\001\n\026RegisterChangeListener\022U\n\026instanceI" +
+ "\"\315\001\n\026RegisterChangeListener\022U\n\026instanceI" +
"dentifierPath\030\001 \002(\01325.org.opendaylight.c" +
"ontroller.mdsal.InstanceIdentifier\022#\n\033da" +
"taChangeListenerActorPath\030\002 \002(\t\022\027\n\017dataC" +
- "hangeScope\030\003 \002(\005\"?\n\033RegisterChangeListen" +
- "erReply\022 \n\030listenerRegistrationPath\030\001 \002(",
- "\tB[\n;org.opendaylight.controller.protobu" +
- "ff.messages.registrationB\034ListenerRegist" +
- "rationMessages"
+ "hangeScope\030\003 \002(\005\022\036\n\026registerOnAllInstanc" +
+ "es\030\004 \001(\010\"?\n\033RegisterChangeListenerReply\022",
+ " \n\030listenerRegistrationPath\030\001 \002(\tB[\n;org" +
+ ".opendaylight.controller.protobuff.messa" +
+ "ges.registrationB\034ListenerRegistrationMe" +
+ "ssages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_descriptor,
- new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", });
+ new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", "RegisterOnAllInstances", });
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_fieldAccessorTable = new
required InstanceIdentifier instanceIdentifierPath=1;
required string dataChangeListenerActorPath=2;
required int32 dataChangeScope=3;
+optional bool registerOnAllInstances=4;
}
/**
* This is the reply for the RegisterChangeListener message
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor, scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope,
+ listener instanceof ClusteredDOMDataChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
@Override
- void onLeadershipChange(final boolean isLeader) {
+ void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+ LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
+
for (ActorSelection dataChangeListener : dataChangeListeners) {
dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
}
+ if(hasLeader) {
+ for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
+ registerDelayedListeners(reg);
+ }
+ delayedRegisterOnAllListeners.clear();
+ }
+
if (isLeader) {
for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
- if(!reg.isClosed()) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(reg.getRegisterChangeListener());
- reg.setDelegate(res.getKey());
- if (res.getValue() != null) {
- reg.getInstance().onDataChanged(res.getValue());
- }
- }
+ registerDelayedListeners(reg);
}
delayedListenerRegistrations.clear();
}
}
+ private void registerDelayedListeners(DelayedListenerRegistration reg) {
+ if(!reg.isClosed()) {
+ final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+ createDelegate(reg.getRegisterChangeListener());
+ reg.setDelegate(res.getKey());
+ if (res.getValue() != null) {
+ reg.getInstance().onDataChanged(res.getValue());
+ }
+ }
+ }
+
@Override
- void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+ void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
- LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+ LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
+ persistenceId(), message.getPath(), isLeader, hasLeader);
final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
- if (isLeader) {
+ if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
createDelegate(message);
registration = res.getKey();
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
- delayedListenerRegistrations.add(delayedReg);
+ if(message.isRegisterOnAllInstances()) {
+ delayedRegisterOnAllListeners.add(delayedReg);
+ } else {
+ delayedListenerRegistrations.add(delayedReg);
+ }
registration = delayedReg;
event = null;
}
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- dataChangeListeners.add(dataChangeListenerPath);
+ if(!message.isRegisterOnAllInstances()) {
+ dataChangeListeners.add(dataChangeListenerPath);
+ }
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
}
@Override
- void onLeadershipChange(final boolean isLeader) {
+ void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
if (isLeader) {
for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
reg.createDelegate(this);
}
@Override
- void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+ void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
final ListenerRegistration<DOMDataTreeChangeListener> registration;
* Invoked whenever the local shard's leadership role changes.
*
* @param isLeader true if the shard has become leader, false if it has
- * become a follower.
+ * become a follower.
+ * @param hasLeader true if the shard knows about leader ID
*/
- abstract void onLeadershipChange(boolean isLeader);
- abstract void onMessage(M message, boolean isLeader);
+ abstract void onLeadershipChange(boolean isLeader, boolean hasLeader);
+ abstract void onMessage(M message, boolean isLeader, boolean hasLeader);
}
if(context.error().isPresent()){
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
- context.error());
+ context.error());
}
try {
} else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof RegisterDataTreeChangeListener) {
- treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof PeerAddressResolved) {
}
}
+ private boolean hasLeader() {
+ return getLeaderId() != null;
+ }
+
public int getPendingTxCommitQueueSize() {
return commitCoordinator.getQueueSize();
}
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
- changeSupport.onLeadershipChange(isLeader);
- treeChangeSupport.onLeadershipChange(isLeader);
+ boolean hasLeader = hasLeader();
+ changeSupport.onLeadershipChange(isLeader, hasLeader);
+ treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
// If this actor is no longer the leader close all the transaction chains
if (!isLeader) {
private final YangInstanceIdentifier path;
private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
+ private final boolean registerOnAllInstances;
public RegisterChangeListener(YangInstanceIdentifier path,
ActorRef dataChangeListener,
- AsyncDataBroker.DataChangeScope scope) {
+ AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
this.path = path;
this.dataChangeListener = dataChangeListener;
this.scope = scope;
+ this.registerOnAllInstances = registerOnAllInstances;
}
public YangInstanceIdentifier getPath() {
return dataChangeListener.path();
}
+ public boolean isRegisterOnAllInstances() {
+ return registerOnAllInstances;
+ }
@Override
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
.setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
- .setDataChangeScope(scope.ordinal()).build();
+ .setDataChangeScope(scope.ordinal()).setRegisterOnAllInstances(registerOnAllInstances).build();
}
public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
- AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
+ AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()], o.getRegisterOnAllInstances());
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.ExecutionContextExecutor;
RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
reply(new RegisterChangeListenerReply(getRef()));
}};
}
+ @Test(timeout=10000)
+ public void testSuccessfulRegistrationForClusteredListener() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+ Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockClusteredListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+ reply(new RegisterChangeListenerReply(getRef()));
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
+
+ proxy.close();
+
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
+ }
+
@Test(timeout=10000)
public void testLocalShardNotFound() {
new JavaTestKit(getSystem()) {{
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
// Now send the RegisterChangeListener and wait for the reply.
shard.tell(new RegisterChangeListener(path, dclActor,
- AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+ AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeChangeListenerReply.class);
+ RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
shard.tell(new FindLeader(), getRef());
//waitUntilLeader(shard);
assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
final String address = "akka://foobar";
shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("getPeerAddresses", address,
- ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+ ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
return testStore;
}
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
- new ApplyJournalEntries(nListEntries));
+ new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
}
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ TestModel.TEST_PATH, containerNode, modification);
final FiniteDuration duration = duration("5 seconds");
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
modification, preCommit);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID2 = "tx2";
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- nonPersistentContext, SCHEMA_CONTEXT);
+ nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().persistence().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
- ShardLeaderStateChanged.class);
+ ShardLeaderStateChanged.class);
assertEquals("getLocalShardDataTree present", true,
leaderStateChanged.getLocalShardDataTree().isPresent());
assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
- leaderStateChanged.getLocalShardDataTree().get());
+ leaderStateChanged.getLocalShardDataTree().get());
MessageCollectorActor.clearMessages(listener);
store.validate(modification);
store.commit(store.prepare(modification));
}
+
+ @Test
+ public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ final Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testDataChangeListenerOnFollower");
+
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ onChangeListenerRegistered.countDown();
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testClusteredDataChangeListernerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+ .shardName("inventory").type("config").build();
+ final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+ "akka://test/user/" + member2ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+
+ if(!(message instanceof ElectionTimeout)) {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+ "akka://test/user/" + member1ShardID.toString()),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+ }
+ };
+
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(followerShardCreator)),
+ member1ShardID.toString());
+
+ final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(leaderShardCreator)),
+ member2ShardID.toString());
+ // Sleep to let election happen
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ shard.tell(new FindLeader(), getRef());
+ final FindLeaderReply findLeaderReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testDataChangeListenerOnFollower-DataChangeListener");
+
+ shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
}
public void testToSerializable(){
TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
- , AsyncDataBroker.DataChangeScope.BASE);
+ , AsyncDataBroker.DataChangeScope.BASE, false);
ListenerRegistrationMessages.RegisterChangeListener serialized
= registerChangeListener.toSerializable();
assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+ assertEquals(false, serialized.getRegisterOnAllInstances());
}
public void testFromSerializable(){
TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
- , AsyncDataBroker.DataChangeScope.SUBTREE);
+ , AsyncDataBroker.DataChangeScope.SUBTREE, true);
ListenerRegistrationMessages.RegisterChangeListener serialized
= registerChangeListener.toSerializable();
assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
-
+ assertEquals(true, fromSerialized.isRegisterOnAllInstances());
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.api;
+
+/**
+ * <p>ClusteredDOMDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.</p>
+ *
+ *
+ * <p>Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{
+
+}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+/**
+ * DOMDataChangeListener enables data change notifications only at leader of the datastore shard.
+ */
+
public interface DOMDataChangeListener extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
}