Enabling Data Change Notifications for all nodes in cluster. 96/24496/6
authorHarman Singh <harmasin@cisco.com>
Fri, 24 Jul 2015 01:32:46 +0000 (18:32 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 18 Aug 2015 17:55:10 +0000 (17:55 +0000)
Two new interfaces are introduced ClusteredDataChangeListener and ClusteredDOMDataChangeListener and external applications will have to implement any of that interface,
if those applications want to listen to remote data change notifications.

Datastore registers listeners, which are instance of that interface, even on followers.

Change-Id: I0e29cdf2a08a2051de5fc8ce73b9ec8ac408e45b
Signed-off-by: Harman Singh <harmasin@cisco.com>
17 files changed:
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java

diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java
new file mode 100644 (file)
index 0000000..511910d
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+/**
+ * <p>
+ * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.
+ * </p>
+ *
+ * <p>Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDataChangeListener extends DataChangeListener{
+}
index 65257a3..2833f41 100644 (file)
@@ -12,6 +12,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
+/*
+ * DataChangeListener enables data change notifications only at leader of the datastore shard
+ */
+
 public interface DataChangeListener extends AsyncDataChangeListener<InstanceIdentifier<?>, DataObject> {
     @Override
     void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change);
index 4bbeaeb..ad52557 100644 (file)
@@ -15,12 +15,15 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Delegator;
@@ -63,8 +66,14 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
 
     public ListenerRegistration<DataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
             final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
-        final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
+        final DOMDataChangeListener domDataChangeListener;
+
+        if(listener instanceof ClusteredDataChangeListener) {
+            domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope);
+        } else {
+            domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
                 triggeringScope);
+        }
         final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
         final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
                 domPath, domDataChangeListener, triggeringScope);
@@ -140,6 +149,20 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
         }
     }
 
+    /**
+     * Translator for ClusteredDataChangeListener
+     */
+
+    private class TranslatingClusteredDataChangeInvoker extends TranslatingDataChangeInvoker implements
+        ClusteredDOMDataChangeListener {
+
+        public TranslatingClusteredDataChangeInvoker(LogicalDatastoreType store, InstanceIdentifier<?> path,
+                                                     DataChangeListener bindingDataChangeListener,
+                                                     DataChangeScope triggeringScope) {
+            super(store, path, bindingDataChangeListener, triggeringScope);
+        }
+    }
+
     private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
         private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> domEvent;
         private final InstanceIdentifier<?> path;
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java
new file mode 100644 (file)
index 0000000..e2fceba
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+public class BindingDOMDataBrokerAdapterTest {
+
+    @Mock
+    DOMDataBroker dataBroker;
+
+    @Mock
+    GeneratedClassLoadingStrategy classLoadingStrategy;
+    @Mock
+    BindingNormalizedNodeCodecRegistry codecRegistry;
+
+    @Mock
+    YangInstanceIdentifier yangInstanceIdentifier;
+
+
+    private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier
+        .create(Top.class);
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testClusteredDataChangeListernerRegisteration() {
+
+        BindingToNormalizedNodeCodec codec = new BindingToNormalizedNodeCodec(classLoadingStrategy, codecRegistry);
+
+        BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter = new BindingDOMDataBrokerAdapter(dataBroker, codec);
+        Mockito.when(codecRegistry.toYangInstanceIdentifier(TOP_PATH)).thenReturn(yangInstanceIdentifier);
+
+        ArgumentCaptor<ClusteredDOMDataChangeListener> clusteredDOMListener = ArgumentCaptor.
+            forClass(ClusteredDOMDataChangeListener.class);
+        ArgumentCaptor<LogicalDatastoreType> logicalDatastoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
+        ArgumentCaptor<AsyncDataBroker.DataChangeScope> dataChangeScope = ArgumentCaptor.
+            forClass(AsyncDataBroker.DataChangeScope.class);
+        ArgumentCaptor<YangInstanceIdentifier> yangInstanceIdentifier = ArgumentCaptor.
+            forClass(YangInstanceIdentifier.class);
+
+        TestListener listener = new TestListener();
+
+        bindingDOMDataBrokerAdapter.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, TOP_PATH, listener,
+            AsyncDataBroker.DataChangeScope.BASE);
+        Mockito.verify(dataBroker).registerDataChangeListener(logicalDatastoreType.capture(), yangInstanceIdentifier.capture(),
+            clusteredDOMListener.capture(), dataChangeScope.capture());
+
+    }
+
+    private class TestListener implements ClusteredDataChangeListener {
+
+        @Override
+        public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+
+        }
+    }
+}
index 77cbd4d..77f5ddb 100644 (file)
@@ -683,6 +683,16 @@ public final class ListenerRegistrationMessages {
      * <code>required int32 dataChangeScope = 3;</code>
      */
     int getDataChangeScope();
+
+    // optional bool registerOnAllInstances = 4;
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    boolean hasRegisterOnAllInstances();
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    boolean getRegisterOnAllInstances();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.RegisterChangeListener}
@@ -758,6 +768,11 @@ public final class ListenerRegistrationMessages {
               dataChangeScope_ = input.readInt32();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              registerOnAllInstances_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -879,10 +894,27 @@ public final class ListenerRegistrationMessages {
       return dataChangeScope_;
     }
 
+    // optional bool registerOnAllInstances = 4;
+    public static final int REGISTERONALLINSTANCES_FIELD_NUMBER = 4;
+    private boolean registerOnAllInstances_;
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    public boolean hasRegisterOnAllInstances() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    public boolean getRegisterOnAllInstances() {
+      return registerOnAllInstances_;
+    }
+
     private void initFields() {
       instanceIdentifierPath_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
       dataChangeListenerActorPath_ = "";
       dataChangeScope_ = 0;
+      registerOnAllInstances_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -921,6 +953,9 @@ public final class ListenerRegistrationMessages {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeInt32(3, dataChangeScope_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBool(4, registerOnAllInstances_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -942,6 +977,10 @@ public final class ListenerRegistrationMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(3, dataChangeScope_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, registerOnAllInstances_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1069,6 +1108,8 @@ public final class ListenerRegistrationMessages {
         bitField0_ = (bitField0_ & ~0x00000002);
         dataChangeScope_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        registerOnAllInstances_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1113,6 +1154,10 @@ public final class ListenerRegistrationMessages {
           to_bitField0_ |= 0x00000004;
         }
         result.dataChangeScope_ = dataChangeScope_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.registerOnAllInstances_ = registerOnAllInstances_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1140,6 +1185,9 @@ public final class ListenerRegistrationMessages {
         if (other.hasDataChangeScope()) {
           setDataChangeScope(other.getDataChangeScope());
         }
+        if (other.hasRegisterOnAllInstances()) {
+          setRegisterOnAllInstances(other.getRegisterOnAllInstances());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1407,6 +1455,39 @@ public final class ListenerRegistrationMessages {
         return this;
       }
 
+      // optional bool registerOnAllInstances = 4;
+      private boolean registerOnAllInstances_ ;
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public boolean hasRegisterOnAllInstances() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public boolean getRegisterOnAllInstances() {
+        return registerOnAllInstances_;
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public Builder setRegisterOnAllInstances(boolean value) {
+        bitField0_ |= 0x00000008;
+        registerOnAllInstances_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public Builder clearRegisterOnAllInstances() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        registerOnAllInstances_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.RegisterChangeListener)
     }
 
@@ -1944,15 +2025,16 @@ public final class ListenerRegistrationMessages {
       "ylight.controller.mdsal\032\014Common.proto\"%\n" +
       "#CloseDataChangeListenerRegistration\"*\n(" +
       "CloseDataChangeListenerRegistrationReply" +
-      "\"\255\001\n\026RegisterChangeListener\022U\n\026instanceI" +
+      "\"\315\001\n\026RegisterChangeListener\022U\n\026instanceI" +
       "dentifierPath\030\001 \002(\01325.org.opendaylight.c" +
       "ontroller.mdsal.InstanceIdentifier\022#\n\033da" +
       "taChangeListenerActorPath\030\002 \002(\t\022\027\n\017dataC" +
-      "hangeScope\030\003 \002(\005\"?\n\033RegisterChangeListen" +
-      "erReply\022 \n\030listenerRegistrationPath\030\001 \002(",
-      "\tB[\n;org.opendaylight.controller.protobu" +
-      "ff.messages.registrationB\034ListenerRegist" +
-      "rationMessages"
+      "hangeScope\030\003 \002(\005\022\036\n\026registerOnAllInstanc" +
+      "es\030\004 \001(\010\"?\n\033RegisterChangeListenerReply\022",
+      " \n\030listenerRegistrationPath\030\001 \002(\tB[\n;org" +
+      ".opendaylight.controller.protobuff.messa" +
+      "ges.registrationB\034ListenerRegistrationMe" +
+      "ssages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1976,7 +2058,7 @@ public final class ListenerRegistrationMessages {
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_descriptor,
-              new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", });
+              new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", "RegisterOnAllInstances", });
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_fieldAccessorTable = new
index 3342a13..4b45c85 100644 (file)
@@ -27,6 +27,7 @@ message RegisterChangeListener{
 required InstanceIdentifier instanceIdentifierPath=1;
 required string dataChangeListenerActorPath=2;
 required int32 dataChangeScope=3;
+optional bool registerOnAllInstances=4;
 }
 /**
 * This is the reply for the RegisterChangeListener message
index 8ac424a..6f8bc63 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -116,7 +117,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor, scope),
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
index e6f63d7..c7b5541 100644 (file)
@@ -28,42 +28,57 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
     private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
     private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
+    private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
     @Override
-    void onLeadershipChange(final boolean isLeader) {
+    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+        LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
+
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
 
+        if(hasLeader) {
+            for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
+                registerDelayedListeners(reg);
+            }
+            delayedRegisterOnAllListeners.clear();
+        }
+
         if (isLeader) {
             for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
-                if(!reg.isClosed()) {
-                    final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
-                            createDelegate(reg.getRegisterChangeListener());
-                    reg.setDelegate(res.getKey());
-                    if (res.getValue() != null) {
-                        reg.getInstance().onDataChanged(res.getValue());
-                    }
-                }
+                registerDelayedListeners(reg);
             }
 
             delayedListenerRegistrations.clear();
         }
     }
 
+    private void registerDelayedListeners(DelayedListenerRegistration reg) {
+        if(!reg.isClosed()) {
+            final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                createDelegate(reg.getRegisterChangeListener());
+            reg.setDelegate(res.getKey());
+            if (res.getValue() != null) {
+                reg.getInstance().onDataChanged(res.getValue());
+            }
+        }
+    }
+
     @Override
-    void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+    void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
 
-        LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+        LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
+            persistenceId(), message.getPath(), isLeader, hasLeader);
 
         final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
         final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
-        if (isLeader) {
+        if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
             final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
                     createDelegate(message);
             registration = res.getKey();
@@ -72,7 +87,11 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
-            delayedListenerRegistrations.add(delayedReg);
+            if(message.isRegisterOnAllInstances()) {
+                delayedRegisterOnAllListeners.add(delayedReg);
+            } else {
+                delayedListenerRegistrations.add(delayedReg);
+            }
             registration = delayedReg;
             event = null;
         }
@@ -100,7 +119,9 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
-        dataChangeListeners.add(dataChangeListenerPath);
+        if(!message.isRegisterOnAllInstances()) {
+            dataChangeListeners.add(dataChangeListenerPath);
+        }
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
                 new DataChangeListenerProxy(dataChangeListenerPath);
index db5eeb8..4281bfe 100644 (file)
@@ -32,7 +32,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
     }
 
     @Override
-    void onLeadershipChange(final boolean isLeader) {
+    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
         if (isLeader) {
             for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
                 reg.createDelegate(this);
@@ -48,7 +48,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
     }
 
     @Override
-    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
         LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
 
         final ListenerRegistration<DOMDataTreeChangeListener> registration;
index 3f92773..8ce2857 100644 (file)
@@ -60,8 +60,9 @@ abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D,
      * Invoked whenever the local shard's leadership role changes.
      *
      * @param isLeader true if the shard has become leader, false if it has
-     *                 become a follower.
+ *                 become a follower.
+     * @param hasLeader true if the shard knows about leader ID
      */
-    abstract void onLeadershipChange(boolean isLeader);
-    abstract void onMessage(M message, boolean isLeader);
+    abstract void onLeadershipChange(boolean isLeader, boolean hasLeader);
+    abstract void onMessage(M message, boolean isLeader, boolean hasLeader);
 }
index 5a10c3e..dd1c0ad 100644 (file)
@@ -221,7 +221,7 @@ public class Shard extends RaftActor {
 
         if(context.error().isPresent()){
             LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
-                    context.error());
+                context.error());
         }
 
         try {
@@ -243,9 +243,9 @@ public class Shard extends RaftActor {
             } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
-                changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+                changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof RegisterDataTreeChangeListener) {
-                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
+                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
@@ -271,6 +271,10 @@ public class Shard extends RaftActor {
         }
     }
 
+    private boolean hasLeader() {
+        return getLeaderId() != null;
+    }
+
     public int getPendingTxCommitQueueSize() {
         return commitCoordinator.getQueueSize();
     }
@@ -655,8 +659,9 @@ public class Shard extends RaftActor {
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
-        changeSupport.onLeadershipChange(isLeader);
-        treeChangeSupport.onLeadershipChange(isLeader);
+        boolean hasLeader = hasLeader();
+        changeSupport.onLeadershipChange(isLeader, hasLeader);
+        treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
 
         // If this actor is no longer the leader close all the transaction chains
         if (!isLeader) {
index 1d8edec..f7a51a9 100644 (file)
@@ -24,14 +24,16 @@ public class RegisterChangeListener implements SerializableMessage {
     private final YangInstanceIdentifier path;
     private final ActorRef dataChangeListener;
     private final AsyncDataBroker.DataChangeScope scope;
+    private final boolean registerOnAllInstances;
 
 
     public RegisterChangeListener(YangInstanceIdentifier path,
         ActorRef dataChangeListener,
-        AsyncDataBroker.DataChangeScope scope) {
+        AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
         this.path = path;
         this.dataChangeListener = dataChangeListener;
         this.scope = scope;
+        this.registerOnAllInstances = registerOnAllInstances;
     }
 
     public YangInstanceIdentifier getPath() {
@@ -47,20 +49,23 @@ public class RegisterChangeListener implements SerializableMessage {
         return dataChangeListener.path();
     }
 
+    public boolean isRegisterOnAllInstances() {
+        return registerOnAllInstances;
+    }
 
     @Override
     public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
       return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
           .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
           .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
-          .setDataChangeScope(scope.ordinal()).build();
+          .setDataChangeScope(scope.ordinal()).setRegisterOnAllInstances(registerOnAllInstances).build();
     }
 
   public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
     ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
     return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
                                                 actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
-                                              AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
+                                              AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()], o.getRegisterOnAllInstances());
   }
 
 
index 57e0e26..299172a 100644 (file)
@@ -42,6 +42,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.ExecutionContextExecutor;
@@ -95,6 +96,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
             Assert.assertEquals("getPath", path, registerMsg.getPath());
             Assert.assertEquals("getScope", scope, registerMsg.getScope());
+            Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
 
             reply(new RegisterChangeListenerReply(getRef()));
 
@@ -121,6 +123,64 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
         }};
     }
 
+    @Test(timeout=10000)
+    public void testSuccessfulRegistrationForClusteredListener() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                mock(ClusterWrapper.class), mock(Configuration.class));
+
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+                Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                "shard-1", actorContext, mockClusteredListener);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardFound(getRef()));
+
+            RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+            Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("getScope", scope, registerMsg.getScope());
+            Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+            reply(new RegisterChangeListenerReply(getRef()));
+
+            for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
+            Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+                proxy.getListenerRegistrationActor());
+
+            watch(proxy.getDataChangeListenerActor());
+
+            proxy.close();
+
+            // The listener registration actor should get a Close message
+            expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+
+            // The DataChangeListener actor should be terminated
+            expectMsgClass(timeout, Terminated.class);
+
+            proxy.close();
+
+            expectNoMsg();
+        }};
+    }
+
     @Test(timeout=10000)
     public void testLocalShardNotFound() {
         new JavaTestKit(getSystem()) {{
index d9df305..4f4162f 100644 (file)
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
@@ -148,7 +149,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -241,7 +242,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send the RegisterChangeListener and wait for the reply.
             shard.tell(new RegisterChangeListener(path, dclActor,
-                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+                    AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterChangeListenerReply.class);
@@ -346,11 +347,11 @@ public class ShardTest extends AbstractShardTest {
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
-                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                    RegisterDataTreeChangeListenerReply.class);
+                RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
             shard.tell(new FindLeader(), getRef());
@@ -450,13 +451,13 @@ public class ShardTest extends AbstractShardTest {
 
             //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
-                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+                Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
             assertEquals("getPeerAddresses", address,
-                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+                ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -551,8 +552,8 @@ public class ShardTest extends AbstractShardTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                SerializationUtils.serializeNormalizedNode(root),
-                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+            SerializationUtils.serializeNormalizedNode(root),
+            Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
 
@@ -594,7 +595,7 @@ public class ShardTest extends AbstractShardTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
-                new ApplyJournalEntries(nListEntries));
+            new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
@@ -610,8 +611,8 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+            new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         final int nListEntries = 16;
         final Set<Integer> listEntryKeys = new HashSet<>();
@@ -936,12 +937,12 @@ public class ShardTest extends AbstractShardTest {
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1268,7 +1269,7 @@ public class ShardTest extends AbstractShardTest {
             final MutableCompositeModification modification = new MutableCompositeModification();
             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
+                TestModel.TEST_PATH, containerNode, modification);
 
             final FiniteDuration duration = duration("5 seconds");
 
@@ -1276,7 +1277,7 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1559,7 +1560,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
@@ -1681,7 +1682,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1826,12 +1827,12 @@ public class ShardTest extends AbstractShardTest {
                     modification, preCommit);
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
@@ -2012,7 +2013,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+                cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -2030,7 +2031,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+                cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2412,7 +2413,7 @@ public class ShardTest extends AbstractShardTest {
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                nonPersistentContext, SCHEMA_CONTEXT);
+            nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
@@ -2448,12 +2449,12 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -2477,11 +2478,11 @@ public class ShardTest extends AbstractShardTest {
                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
-                        ShardLeaderStateChanged.class);
+                    ShardLeaderStateChanged.class);
                 assertEquals("getLocalShardDataTree present", true,
                         leaderStateChanged.getLocalShardDataTree().isPresent());
                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
-                        leaderStateChanged.getLocalShardDataTree().get());
+                    leaderStateChanged.getLocalShardDataTree().get());
 
                 MessageCollectorActor.clearMessages(listener);
 
@@ -2521,4 +2522,144 @@ public class ShardTest extends AbstractShardTest {
         store.validate(modification);
         store.commit(store.prepare(modification));
     }
+
+    @Test
+    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+            final Creator<Shard> creator = new Creator<Shard>() {
+                boolean firstElectionTimeout = true;
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
+                                firstElectionTimeout = false;
+                                final ActorRef self = getSelf();
+                                new Thread() {
+                                    @Override
+                                    public void run() {
+                                        Uninterruptibles.awaitUninterruptibly(
+                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                        self.tell(message, self);
+                                    }
+                                }.start();
+
+                                onFirstElectionTimeout.countDown();
+                            } else {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)),
+                "testDataChangeListenerOnFollower");
+
+            assertEquals("Got first ElectionTimeout", true,
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeadeReply =
+                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            onChangeListenerRegistered.countDown();
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testClusteredDataChangeListernerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+                .shardName("inventory").type("config").build();
+
+            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+                .shardName("inventory").type("config").build();
+            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+                        "akka://test/user/" + member2ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+
+                            if(!(message instanceof ElectionTimeout)) {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+                        "akka://test/user/" + member1ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+                }
+            };
+
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(followerShardCreator)),
+                member1ShardID.toString());
+
+            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(leaderShardCreator)),
+                member2ShardID.toString());
+            // Sleep to let election happen
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeaderReply =
+                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 }
index 5dfabce..4ccee66 100644 (file)
@@ -41,7 +41,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
     public void testToSerializable(){
         TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
         RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
-                , AsyncDataBroker.DataChangeScope.BASE);
+                , AsyncDataBroker.DataChangeScope.BASE, false);
 
         ListenerRegistrationMessages.RegisterChangeListener serialized
                 = registerChangeListener.toSerializable();
@@ -51,6 +51,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
         assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
         assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
         assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+        assertEquals(false, serialized.getRegisterOnAllInstances());
 
     }
 
@@ -58,7 +59,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
     public void testFromSerializable(){
         TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
         RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
-                , AsyncDataBroker.DataChangeScope.SUBTREE);
+                , AsyncDataBroker.DataChangeScope.SUBTREE, true);
 
         ListenerRegistrationMessages.RegisterChangeListener serialized
                 = registerChangeListener.toSerializable();
@@ -69,7 +70,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
         assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
         assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
         assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
-
+        assertEquals(true, fromSerialized.isRegisterOnAllInstances());
 
     }
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java
new file mode 100644 (file)
index 0000000..e5dbc54
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.api;
+
+/**
+ * <p>ClusteredDOMDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.</p>
+ *
+ *
+ * <p>Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{
+
+}
index 393d1ea..c46529b 100644 (file)
@@ -11,6 +11,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+/**
+ * DOMDataChangeListener enables data change notifications only at leader of the datastore shard.
+ */
+
 public interface DOMDataChangeListener extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
 
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.