Enabling Data Change Notifications for all nodes in cluster. 96/24496/6
authorHarman Singh <harmasin@cisco.com>
Fri, 24 Jul 2015 01:32:46 +0000 (18:32 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 18 Aug 2015 17:55:10 +0000 (17:55 +0000)
Two new interfaces are introduced ClusteredDataChangeListener and ClusteredDOMDataChangeListener and external applications will have to implement any of that interface,
if those applications want to listen to remote data change notifications.

Datastore registers listeners, which are instance of that interface, even on followers.

Change-Id: I0e29cdf2a08a2051de5fc8ce73b9ec8ac408e45b
Signed-off-by: Harman Singh <harmasin@cisco.com>
17 files changed:
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataChangeListener.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ListenerRegistration.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataChangeListener.java

diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ClusteredDataChangeListener.java
new file mode 100644 (file)
index 0000000..511910d
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+/**
+ * <p>
+ * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.
+ * </p>
+ *
+ * <p>Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDataChangeListener extends DataChangeListener{
+}
index 65257a328ac4721eede2f067535605b5836a7c8c..2833f41f6762be6cdc01ba752910feeb4749568a 100644 (file)
@@ -12,6 +12,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
+/*
+ * DataChangeListener enables data change notifications only at leader of the datastore shard
+ */
+
 public interface DataChangeListener extends AsyncDataChangeListener<InstanceIdentifier<?>, DataObject> {
     @Override
     void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change);
index 4bbeaebf55b34888d37e74ec0aa2a468dd59b3b4..ad52557b4752a935b03fba245d540f41ea82d6e2 100644 (file)
@@ -15,12 +15,15 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Delegator;
@@ -63,8 +66,14 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
 
     public ListenerRegistration<DataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
             final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
-        final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
+        final DOMDataChangeListener domDataChangeListener;
+
+        if(listener instanceof ClusteredDataChangeListener) {
+            domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope);
+        } else {
+            domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
                 triggeringScope);
+        }
         final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
         final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
                 domPath, domDataChangeListener, triggeringScope);
@@ -140,6 +149,20 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
         }
     }
 
+    /**
+     * Translator for ClusteredDataChangeListener
+     */
+
+    private class TranslatingClusteredDataChangeInvoker extends TranslatingDataChangeInvoker implements
+        ClusteredDOMDataChangeListener {
+
+        public TranslatingClusteredDataChangeInvoker(LogicalDatastoreType store, InstanceIdentifier<?> path,
+                                                     DataChangeListener bindingDataChangeListener,
+                                                     DataChangeScope triggeringScope) {
+            super(store, path, bindingDataChangeListener, triggeringScope);
+        }
+    }
+
     private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
         private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> domEvent;
         private final InstanceIdentifier<?> path;
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingDOMDataBrokerAdapterTest.java
new file mode 100644 (file)
index 0000000..e2fceba
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+public class BindingDOMDataBrokerAdapterTest {
+
+    @Mock
+    DOMDataBroker dataBroker;
+
+    @Mock
+    GeneratedClassLoadingStrategy classLoadingStrategy;
+    @Mock
+    BindingNormalizedNodeCodecRegistry codecRegistry;
+
+    @Mock
+    YangInstanceIdentifier yangInstanceIdentifier;
+
+
+    private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier
+        .create(Top.class);
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testClusteredDataChangeListernerRegisteration() {
+
+        BindingToNormalizedNodeCodec codec = new BindingToNormalizedNodeCodec(classLoadingStrategy, codecRegistry);
+
+        BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter = new BindingDOMDataBrokerAdapter(dataBroker, codec);
+        Mockito.when(codecRegistry.toYangInstanceIdentifier(TOP_PATH)).thenReturn(yangInstanceIdentifier);
+
+        ArgumentCaptor<ClusteredDOMDataChangeListener> clusteredDOMListener = ArgumentCaptor.
+            forClass(ClusteredDOMDataChangeListener.class);
+        ArgumentCaptor<LogicalDatastoreType> logicalDatastoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
+        ArgumentCaptor<AsyncDataBroker.DataChangeScope> dataChangeScope = ArgumentCaptor.
+            forClass(AsyncDataBroker.DataChangeScope.class);
+        ArgumentCaptor<YangInstanceIdentifier> yangInstanceIdentifier = ArgumentCaptor.
+            forClass(YangInstanceIdentifier.class);
+
+        TestListener listener = new TestListener();
+
+        bindingDOMDataBrokerAdapter.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, TOP_PATH, listener,
+            AsyncDataBroker.DataChangeScope.BASE);
+        Mockito.verify(dataBroker).registerDataChangeListener(logicalDatastoreType.capture(), yangInstanceIdentifier.capture(),
+            clusteredDOMListener.capture(), dataChangeScope.capture());
+
+    }
+
+    private class TestListener implements ClusteredDataChangeListener {
+
+        @Override
+        public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+
+        }
+    }
+}
index 77cbd4da46c811e43a3a2692a40f007ab51e5097..77f5ddb4f2072b07e81d9618a8d34b2a41674501 100644 (file)
@@ -683,6 +683,16 @@ public final class ListenerRegistrationMessages {
      * <code>required int32 dataChangeScope = 3;</code>
      */
     int getDataChangeScope();
+
+    // optional bool registerOnAllInstances = 4;
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    boolean hasRegisterOnAllInstances();
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    boolean getRegisterOnAllInstances();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.RegisterChangeListener}
@@ -758,6 +768,11 @@ public final class ListenerRegistrationMessages {
               dataChangeScope_ = input.readInt32();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              registerOnAllInstances_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -879,10 +894,27 @@ public final class ListenerRegistrationMessages {
       return dataChangeScope_;
     }
 
+    // optional bool registerOnAllInstances = 4;
+    public static final int REGISTERONALLINSTANCES_FIELD_NUMBER = 4;
+    private boolean registerOnAllInstances_;
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    public boolean hasRegisterOnAllInstances() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional bool registerOnAllInstances = 4;</code>
+     */
+    public boolean getRegisterOnAllInstances() {
+      return registerOnAllInstances_;
+    }
+
     private void initFields() {
       instanceIdentifierPath_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
       dataChangeListenerActorPath_ = "";
       dataChangeScope_ = 0;
+      registerOnAllInstances_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -921,6 +953,9 @@ public final class ListenerRegistrationMessages {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeInt32(3, dataChangeScope_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBool(4, registerOnAllInstances_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -942,6 +977,10 @@ public final class ListenerRegistrationMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(3, dataChangeScope_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, registerOnAllInstances_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1069,6 +1108,8 @@ public final class ListenerRegistrationMessages {
         bitField0_ = (bitField0_ & ~0x00000002);
         dataChangeScope_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        registerOnAllInstances_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1113,6 +1154,10 @@ public final class ListenerRegistrationMessages {
           to_bitField0_ |= 0x00000004;
         }
         result.dataChangeScope_ = dataChangeScope_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.registerOnAllInstances_ = registerOnAllInstances_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1140,6 +1185,9 @@ public final class ListenerRegistrationMessages {
         if (other.hasDataChangeScope()) {
           setDataChangeScope(other.getDataChangeScope());
         }
+        if (other.hasRegisterOnAllInstances()) {
+          setRegisterOnAllInstances(other.getRegisterOnAllInstances());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1407,6 +1455,39 @@ public final class ListenerRegistrationMessages {
         return this;
       }
 
+      // optional bool registerOnAllInstances = 4;
+      private boolean registerOnAllInstances_ ;
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public boolean hasRegisterOnAllInstances() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public boolean getRegisterOnAllInstances() {
+        return registerOnAllInstances_;
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public Builder setRegisterOnAllInstances(boolean value) {
+        bitField0_ |= 0x00000008;
+        registerOnAllInstances_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool registerOnAllInstances = 4;</code>
+       */
+      public Builder clearRegisterOnAllInstances() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        registerOnAllInstances_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.RegisterChangeListener)
     }
 
@@ -1944,15 +2025,16 @@ public final class ListenerRegistrationMessages {
       "ylight.controller.mdsal\032\014Common.proto\"%\n" +
       "#CloseDataChangeListenerRegistration\"*\n(" +
       "CloseDataChangeListenerRegistrationReply" +
-      "\"\255\001\n\026RegisterChangeListener\022U\n\026instanceI" +
+      "\"\315\001\n\026RegisterChangeListener\022U\n\026instanceI" +
       "dentifierPath\030\001 \002(\01325.org.opendaylight.c" +
       "ontroller.mdsal.InstanceIdentifier\022#\n\033da" +
       "taChangeListenerActorPath\030\002 \002(\t\022\027\n\017dataC" +
-      "hangeScope\030\003 \002(\005\"?\n\033RegisterChangeListen" +
-      "erReply\022 \n\030listenerRegistrationPath\030\001 \002(",
-      "\tB[\n;org.opendaylight.controller.protobu" +
-      "ff.messages.registrationB\034ListenerRegist" +
-      "rationMessages"
+      "hangeScope\030\003 \002(\005\022\036\n\026registerOnAllInstanc" +
+      "es\030\004 \001(\010\"?\n\033RegisterChangeListenerReply\022",
+      " \n\030listenerRegistrationPath\030\001 \002(\tB[\n;org" +
+      ".opendaylight.controller.protobuff.messa" +
+      "ges.registrationB\034ListenerRegistrationMe" +
+      "ssages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1976,7 +2058,7 @@ public final class ListenerRegistrationMessages {
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_RegisterChangeListener_descriptor,
-              new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", });
+              new java.lang.String[] { "InstanceIdentifierPath", "DataChangeListenerActorPath", "DataChangeScope", "RegisterOnAllInstances", });
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_RegisterChangeListenerReply_fieldAccessorTable = new
index 3342a1364abba61a87dae70190859d7908c4e1d0..4b45c85502295871e9fb93fb7338602d84de6de0 100644 (file)
@@ -27,6 +27,7 @@ message RegisterChangeListener{
 required InstanceIdentifier instanceIdentifierPath=1;
 required string dataChangeListenerActorPath=2;
 required int32 dataChangeScope=3;
+optional bool registerOnAllInstances=4;
 }
 /**
 * This is the reply for the RegisterChangeListener message
index 8ac424a6a89cc37f8832443fd93313c378feec9b..6f8bc633c7401cee2750aa7e9ce83bcbb525a8f2 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -116,7 +117,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor, scope),
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
index e6f63d7154ba6d5f301ed7fd4b77f111d0ea8034..c7b55414e6d8b8146cf8f00812860716baebf158 100644 (file)
@@ -28,42 +28,57 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
     private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
     private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
+    private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
     @Override
-    void onLeadershipChange(final boolean isLeader) {
+    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+        LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
+
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
 
+        if(hasLeader) {
+            for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
+                registerDelayedListeners(reg);
+            }
+            delayedRegisterOnAllListeners.clear();
+        }
+
         if (isLeader) {
             for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
-                if(!reg.isClosed()) {
-                    final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
-                            createDelegate(reg.getRegisterChangeListener());
-                    reg.setDelegate(res.getKey());
-                    if (res.getValue() != null) {
-                        reg.getInstance().onDataChanged(res.getValue());
-                    }
-                }
+                registerDelayedListeners(reg);
             }
 
             delayedListenerRegistrations.clear();
         }
     }
 
+    private void registerDelayedListeners(DelayedListenerRegistration reg) {
+        if(!reg.isClosed()) {
+            final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                createDelegate(reg.getRegisterChangeListener());
+            reg.setDelegate(res.getKey());
+            if (res.getValue() != null) {
+                reg.getInstance().onDataChanged(res.getValue());
+            }
+        }
+    }
+
     @Override
-    void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+    void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
 
-        LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+        LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
+            persistenceId(), message.getPath(), isLeader, hasLeader);
 
         final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
         final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
-        if (isLeader) {
+        if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
             final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
                     createDelegate(message);
             registration = res.getKey();
@@ -72,7 +87,11 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
-            delayedListenerRegistrations.add(delayedReg);
+            if(message.isRegisterOnAllInstances()) {
+                delayedRegisterOnAllListeners.add(delayedReg);
+            } else {
+                delayedListenerRegistrations.add(delayedReg);
+            }
             registration = delayedReg;
             event = null;
         }
@@ -100,7 +119,9 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
-        dataChangeListeners.add(dataChangeListenerPath);
+        if(!message.isRegisterOnAllInstances()) {
+            dataChangeListeners.add(dataChangeListenerPath);
+        }
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
                 new DataChangeListenerProxy(dataChangeListenerPath);
index db5eeb83e70eedd4101cceeabd5491a2c5b3a47e..4281bfe796afd1e1844286b4ae854aa57de8dab8 100644 (file)
@@ -32,7 +32,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
     }
 
     @Override
-    void onLeadershipChange(final boolean isLeader) {
+    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
         if (isLeader) {
             for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
                 reg.createDelegate(this);
@@ -48,7 +48,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
     }
 
     @Override
-    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
         LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
 
         final ListenerRegistration<DOMDataTreeChangeListener> registration;
index 3f927736b5a8eaf5ee1be0e355a43f616fb2d9a1..8ce28571ee16b22bca1d1a5a8d76e5c88dd51583 100644 (file)
@@ -60,8 +60,9 @@ abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D,
      * Invoked whenever the local shard's leadership role changes.
      *
      * @param isLeader true if the shard has become leader, false if it has
-     *                 become a follower.
+ *                 become a follower.
+     * @param hasLeader true if the shard knows about leader ID
      */
-    abstract void onLeadershipChange(boolean isLeader);
-    abstract void onMessage(M message, boolean isLeader);
+    abstract void onLeadershipChange(boolean isLeader, boolean hasLeader);
+    abstract void onMessage(M message, boolean isLeader, boolean hasLeader);
 }
index 5a10c3e961bc8e038a0dd3969134b903aace2a56..dd1c0ad6ff464959bb6fdf97220a8862d52edf8d 100644 (file)
@@ -221,7 +221,7 @@ public class Shard extends RaftActor {
 
         if(context.error().isPresent()){
             LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
-                    context.error());
+                context.error());
         }
 
         try {
@@ -243,9 +243,9 @@ public class Shard extends RaftActor {
             } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
-                changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+                changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof RegisterDataTreeChangeListener) {
-                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
+                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
@@ -271,6 +271,10 @@ public class Shard extends RaftActor {
         }
     }
 
+    private boolean hasLeader() {
+        return getLeaderId() != null;
+    }
+
     public int getPendingTxCommitQueueSize() {
         return commitCoordinator.getQueueSize();
     }
@@ -655,8 +659,9 @@ public class Shard extends RaftActor {
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
-        changeSupport.onLeadershipChange(isLeader);
-        treeChangeSupport.onLeadershipChange(isLeader);
+        boolean hasLeader = hasLeader();
+        changeSupport.onLeadershipChange(isLeader, hasLeader);
+        treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
 
         // If this actor is no longer the leader close all the transaction chains
         if (!isLeader) {
index 1d8edece1a5161137daab2abb03e6b968a920615..f7a51a93ffe173b755cdd5bd2d59ccb5cacb19ef 100644 (file)
@@ -24,14 +24,16 @@ public class RegisterChangeListener implements SerializableMessage {
     private final YangInstanceIdentifier path;
     private final ActorRef dataChangeListener;
     private final AsyncDataBroker.DataChangeScope scope;
+    private final boolean registerOnAllInstances;
 
 
     public RegisterChangeListener(YangInstanceIdentifier path,
         ActorRef dataChangeListener,
-        AsyncDataBroker.DataChangeScope scope) {
+        AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
         this.path = path;
         this.dataChangeListener = dataChangeListener;
         this.scope = scope;
+        this.registerOnAllInstances = registerOnAllInstances;
     }
 
     public YangInstanceIdentifier getPath() {
@@ -47,20 +49,23 @@ public class RegisterChangeListener implements SerializableMessage {
         return dataChangeListener.path();
     }
 
+    public boolean isRegisterOnAllInstances() {
+        return registerOnAllInstances;
+    }
 
     @Override
     public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
       return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
           .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
           .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
-          .setDataChangeScope(scope.ordinal()).build();
+          .setDataChangeScope(scope.ordinal()).setRegisterOnAllInstances(registerOnAllInstances).build();
     }
 
   public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
     ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
     return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
                                                 actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
-                                              AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
+                                              AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()], o.getRegisterOnAllInstances());
   }
 
 
index 57e0e26c116c036f218b3779d2120a7bf1a21f81..299172af34cf0d061c01ec54a161586f64296192 100644 (file)
@@ -42,6 +42,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.ExecutionContextExecutor;
@@ -95,6 +96,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
             Assert.assertEquals("getPath", path, registerMsg.getPath());
             Assert.assertEquals("getScope", scope, registerMsg.getScope());
+            Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
 
             reply(new RegisterChangeListenerReply(getRef()));
 
@@ -121,6 +123,64 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
         }};
     }
 
+    @Test(timeout=10000)
+    public void testSuccessfulRegistrationForClusteredListener() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                mock(ClusterWrapper.class), mock(Configuration.class));
+
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+                Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                "shard-1", actorContext, mockClusteredListener);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardFound(getRef()));
+
+            RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+            Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("getScope", scope, registerMsg.getScope());
+            Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+            reply(new RegisterChangeListenerReply(getRef()));
+
+            for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
+            Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+                proxy.getListenerRegistrationActor());
+
+            watch(proxy.getDataChangeListenerActor());
+
+            proxy.close();
+
+            // The listener registration actor should get a Close message
+            expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+
+            // The DataChangeListener actor should be terminated
+            expectMsgClass(timeout, Terminated.class);
+
+            proxy.close();
+
+            expectNoMsg();
+        }};
+    }
+
     @Test(timeout=10000)
     public void testLocalShardNotFound() {
         new JavaTestKit(getSystem()) {{
index d9df305453073ec515f7427c783af21886f800e2..4f4162f607b76c60a22914b74c651626747b61a1 100644 (file)
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
@@ -148,7 +149,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -241,7 +242,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send the RegisterChangeListener and wait for the reply.
             shard.tell(new RegisterChangeListener(path, dclActor,
-                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+                    AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterChangeListenerReply.class);
@@ -346,11 +347,11 @@ public class ShardTest extends AbstractShardTest {
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
-                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                    RegisterDataTreeChangeListenerReply.class);
+                RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
             shard.tell(new FindLeader(), getRef());
@@ -450,13 +451,13 @@ public class ShardTest extends AbstractShardTest {
 
             //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
-                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+                Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
             assertEquals("getPeerAddresses", address,
-                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+                ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -551,8 +552,8 @@ public class ShardTest extends AbstractShardTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                SerializationUtils.serializeNormalizedNode(root),
-                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+            SerializationUtils.serializeNormalizedNode(root),
+            Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
 
@@ -594,7 +595,7 @@ public class ShardTest extends AbstractShardTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
-                new ApplyJournalEntries(nListEntries));
+            new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
@@ -610,8 +611,8 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+            new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         final int nListEntries = 16;
         final Set<Integer> listEntryKeys = new HashSet<>();
@@ -936,12 +937,12 @@ public class ShardTest extends AbstractShardTest {
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1268,7 +1269,7 @@ public class ShardTest extends AbstractShardTest {
             final MutableCompositeModification modification = new MutableCompositeModification();
             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
+                TestModel.TEST_PATH, containerNode, modification);
 
             final FiniteDuration duration = duration("5 seconds");
 
@@ -1276,7 +1277,7 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1559,7 +1560,7 @@ public class ShardTest extends AbstractShardTest {
 
             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
@@ -1681,7 +1682,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1826,12 +1827,12 @@ public class ShardTest extends AbstractShardTest {
                     modification, preCommit);
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+                cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
@@ -2012,7 +2013,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+                cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -2030,7 +2031,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+                cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2412,7 +2413,7 @@ public class ShardTest extends AbstractShardTest {
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                nonPersistentContext, SCHEMA_CONTEXT);
+            nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
@@ -2448,12 +2449,12 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -2477,11 +2478,11 @@ public class ShardTest extends AbstractShardTest {
                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
-                        ShardLeaderStateChanged.class);
+                    ShardLeaderStateChanged.class);
                 assertEquals("getLocalShardDataTree present", true,
                         leaderStateChanged.getLocalShardDataTree().isPresent());
                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
-                        leaderStateChanged.getLocalShardDataTree().get());
+                    leaderStateChanged.getLocalShardDataTree().get());
 
                 MessageCollectorActor.clearMessages(listener);
 
@@ -2521,4 +2522,144 @@ public class ShardTest extends AbstractShardTest {
         store.validate(modification);
         store.commit(store.prepare(modification));
     }
+
+    @Test
+    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+            final Creator<Shard> creator = new Creator<Shard>() {
+                boolean firstElectionTimeout = true;
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
+                                firstElectionTimeout = false;
+                                final ActorRef self = getSelf();
+                                new Thread() {
+                                    @Override
+                                    public void run() {
+                                        Uninterruptibles.awaitUninterruptibly(
+                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                        self.tell(message, self);
+                                    }
+                                }.start();
+
+                                onFirstElectionTimeout.countDown();
+                            } else {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)),
+                "testDataChangeListenerOnFollower");
+
+            assertEquals("Got first ElectionTimeout", true,
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeadeReply =
+                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            onChangeListenerRegistered.countDown();
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testClusteredDataChangeListernerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
+                .shardName("inventory").type("config").build();
+
+            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
+                .shardName("inventory").type("config").build();
+            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
+                        "akka://test/user/" + member2ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+
+                            if(!(message instanceof ElectionTimeout)) {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
+
+            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
+                        "akka://test/user/" + member1ShardID.toString()),
+                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+                }
+            };
+
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(followerShardCreator)),
+                member1ShardID.toString());
+
+            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(leaderShardCreator)),
+                member2ShardID.toString());
+            // Sleep to let election happen
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            shard.tell(new FindLeader(), getRef());
+            final FindLeaderReply findLeaderReply =
+                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                "testDataChangeListenerOnFollower-DataChangeListener");
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 }
index 5dfabce0066ce04758dc3a4e8b83ce4624af4d9b..4ccee663e981ddc009a93fb105b9c9f5cd06d7b2 100644 (file)
@@ -41,7 +41,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
     public void testToSerializable(){
         TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
         RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
-                , AsyncDataBroker.DataChangeScope.BASE);
+                , AsyncDataBroker.DataChangeScope.BASE, false);
 
         ListenerRegistrationMessages.RegisterChangeListener serialized
                 = registerChangeListener.toSerializable();
@@ -51,6 +51,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
         assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
         assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
         assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+        assertEquals(false, serialized.getRegisterOnAllInstances());
 
     }
 
@@ -58,7 +59,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
     public void testFromSerializable(){
         TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
         RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
-                , AsyncDataBroker.DataChangeScope.SUBTREE);
+                , AsyncDataBroker.DataChangeScope.SUBTREE, true);
 
         ListenerRegistrationMessages.RegisterChangeListener serialized
                 = registerChangeListener.toSerializable();
@@ -69,7 +70,7 @@ public class RegisterChangeListenerTest extends AbstractActorTest {
         assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
         assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
         assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
-
+        assertEquals(true, fromSerialized.isRegisterOnAllInstances());
 
     }
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/ClusteredDOMDataChangeListener.java
new file mode 100644 (file)
index 0000000..e5dbc54
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.api;
+
+/**
+ * <p>ClusteredDOMDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
+ * where this listener is registered.</p>
+ *
+ *
+ * <p>Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to listen
+ * to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change notifications
+ * only at leader of the datastore shard.</p>
+ *
+ */
+
+public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{
+
+}
index 393d1eaafeaaf85388af54fbd492316ebc013937..c46529bd0271cf0267efcf2286458a3ed8a5db79 100644 (file)
@@ -11,6 +11,10 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+/**
+ * DOMDataChangeListener enables data change notifications only at leader of the datastore shard.
+ */
+
 public interface DOMDataChangeListener extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
 
 }